From d77698290a4803dfc88b147ef4061fbefed107ec Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Tue, 8 Jul 2025 14:43:40 -0700 Subject: [PATCH 1/3] Added logic for dropped item count --- .../opentelemetry/exporter/_constants.py | 4 +- .../monitor/opentelemetry/exporter/_utils.py | 38 ++++ .../exporter/statsbeat/_customer_statsbeat.py | 72 ++++++- .../statsbeat/test_customer_statsbeat.py | 182 +++++++++++++++++- 4 files changed, 289 insertions(+), 7 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py index 55e32cb6c984..88168a1e7602 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py @@ -10,6 +10,7 @@ ) from azure.core import CaseInsensitiveEnumMeta + # Environment variables _APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL = "APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL" @@ -64,7 +65,6 @@ _MESSAGE_ENVELOPE_NAME = "Microsoft.ApplicationInsights.Message" _REQUEST_ENVELOPE_NAME = "Microsoft.ApplicationInsights.Request" _REMOTE_DEPENDENCY_ENVELOPE_NAME = "Microsoft.ApplicationInsights.RemoteDependency" -_REMOTE_DEPENDENCY_ENVELOPE_DATA = "Microsoft.ApplicationInsights.RemoteDependencyData" _EVENT_ENVELOPE_NAME = "Microsoft.ApplicationInsights.Event" _PAGE_VIEW_ENVELOPE_NAME = "Microsoft.ApplicationInsights.PageView" _PERFORMANCE_COUNTER_ENVELOPE_NAME = "Microsoft.ApplicationInsights.PerformanceCounter" @@ -167,7 +167,7 @@ def __init__(self, language: str, version: str, compute_type: str): _TYPE_MAP = { _EVENT_ENVELOPE_NAME: _CUSTOM_EVENT, _METRIC_ENVELOPE_NAME: _CUSTOM_METRIC, - _REMOTE_DEPENDENCY_ENVELOPE_DATA: _DEPENDENCY, + _REMOTE_DEPENDENCY_ENVELOPE_NAME: _DEPENDENCY, _EXCEPTION_ENVELOPE_NAME: _EXCEPTION, _PAGE_VIEW_ENVELOPE_NAME: _PAGE_VIEW, _MESSAGE_ENVELOPE_NAME: _TRACE, diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py index f2570b88120e..d693395b9ac2 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py @@ -400,3 +400,41 @@ def get_compute_type(): if _is_on_aks(): return _RP_Names.AKS.value return _RP_Names.UNKNOWN.value + +def categorize_exception_message(exception_message: str) -> str: + message = exception_message.lower() + exception_categories = { + "timeout_exception": ["timeout", "timed out"], + "network_exception": ["network", "connection"], + "auth_exception": ["auth", "authentication", "forbidden"], + "parse_exception": ["parse", "parsing", "invalid"], + "storage_exception": ["disk", "storage", "file"], + "memory_exception": ["memory", "out of memory"], + } + for category, keywords in exception_categories.items(): + if any(keyword in message for keyword in keywords): + return category + return "other_exception" + +def categorize_status_code(status_code: int) -> str: + status_map = { + 400: "bad_request", + 401: "unauthorized", + 402: "daily quota exceeded", + 403: "forbidden", + 404: "not_found", + 408: "request_timeout", + 413: "payload_too_large", + 429: "too_many_requests", + 500: "internal_server_error", + 502: "bad_gateway", + 503: "service_unavailable", + 504: "gateway_timeout", + } + if status_code in status_map: + return status_map[status_code] + if 400 <= status_code < 500: + return "client_error_4xx" + if 500 <= status_code < 600: + return "server_error_5xx" + return f"status_{status_code}" diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py index 2dd23630de7e..8e2ae77678bc 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py @@ -6,7 +6,7 @@ metrics that track the usage and performance of the Azure Monitor OpenTelemetry Exporter. """ -from typing import List, Dict, Any, Iterable +from typing import List, Dict, Any, Iterable, Optional import os from opentelemetry.metrics import CallbackOptions, Observation @@ -17,7 +17,7 @@ _APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW, _DEFAULT_STATS_SHORT_EXPORT_INTERVAL, CustomerStatsbeatProperties, - #DropCode, + DropCode, #RetryCode, CustomerStatsbeatMetricName, _CUSTOMER_STATSBEAT_LANGUAGE, @@ -27,6 +27,8 @@ from azure.monitor.opentelemetry.exporter._utils import ( Singleton, get_compute_type, + categorize_status_code, + categorize_exception_message, ) from azure.monitor.opentelemetry.exporter import VERSION @@ -34,6 +36,7 @@ class _CustomerStatsbeatTelemetryCounters: def __init__(self): self.total_item_success_count: Dict[str, Any] = {} + self.total_item_drop_count: Dict[str, Dict[str, Dict[str, int]]] = {} class CustomerStatsbeatMetrics(metaclass=Singleton): def __init__(self, options): @@ -66,20 +69,51 @@ def __init__(self, options): description="Tracks successful telemetry items sent to Azure Monitor", callbacks=[self._item_success_callback] ) + self._dropped_gauge = self._customer_statsbeat_meter.create_observable_gauge( + name=CustomerStatsbeatMetricName.ITEM_DROP_COUNT.value, + description="Tracks dropped telemetry items sent to Azure Monitor", + callbacks=[self._item_drop_callback] + ) def count_successful_items(self, count: int, telemetry_type: str) -> None: if not self._is_enabled or count <= 0: return + if telemetry_type in self._counters.total_item_success_count: self._counters.total_item_success_count[telemetry_type] += count else: self._counters.total_item_success_count[telemetry_type] = count + def count_dropped_items( + self, count: int, telemetry_type: str, drop_code: DropCode, + exception_message: Optional[str] = None + ) -> None: + if not self._is_enabled or count <= 0: + return + + # Get or create the drop_code map for this telemetry_type + if telemetry_type not in self._counters.total_item_drop_count: + self._counters.total_item_drop_count[telemetry_type] = {} + drop_code_map = self._counters.total_item_drop_count[telemetry_type] + + # Get or create the reason map for this drop_code + if drop_code not in drop_code_map: + drop_code_map[drop_code] = {} + reason_map = drop_code_map[drop_code] + + # Generate a low-cardinality, informative reason description + reason = self._get_drop_reason(drop_code, exception_message) + + # Update the count for this reason + current_count = reason_map.get(reason, 0) + reason_map[reason] = current_count + 1 + def _item_success_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument if not getattr(self, "_is_enabled", False): return [] observations: List[Observation] = [] + for telemetry_type, count in self._counters.total_item_success_count.items(): attributes = { "language": self._customer_properties.language, @@ -90,3 +124,37 @@ def _item_success_callback(self, options: CallbackOptions) -> Iterable[Observati observations.append(Observation(count, dict(attributes))) return observations + + def _item_drop_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument + if not getattr(self, "_is_enabled", False): + return [] + observations: List[Observation] = [] + for telemetry_type, drop_code_map in self._counters.total_item_drop_count.items(): + for drop_code, reason_map in drop_code_map.items(): + for reason, count in reason_map.items(): + attributes = { + "language": self._customer_properties.language, + "version": self._customer_properties.version, + "compute_type": self._customer_properties.compute_type, + "drop.code": str(drop_code), + "drop.reason": reason, + "telemetry_type": telemetry_type + } + observations.append(Observation(count, dict(attributes))) + + return observations + + def _get_drop_reason(self, drop_code: DropCode, exception_message: Optional[str] = None) -> str: + if isinstance(drop_code, int): + return categorize_status_code(drop_code) + + if drop_code == DropCode.CLIENT_EXCEPTION: + return categorize_exception_message(exception_message) if exception_message else "unknown_exception" + + drop_code_reasons = { + DropCode.CLIENT_READONLY: "readonly_mode", + DropCode.CLIENT_STALE_DATA: "stale_data", + DropCode.CLIENT_PERSISTENCE_CAPACITY: "persistence_full", + } + + return drop_code_reasons.get(drop_code, "unknown_reason") diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py index 7f4c1683366d..d09edb54d332 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py @@ -34,6 +34,20 @@ from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter from azure.monitor.opentelemetry.exporter.statsbeat._state import _REQUESTS_MAP +def convert_envelope_names_to_base_type(envelope_name): + if not envelope_name.endswith("Data"): + return envelope_name + "Data" + return envelope_name +class EnvelopeTypeMapper: + @classmethod + def get_type_map(cls): + return { + convert_envelope_names_to_base_type(envelope_name): telemetry_type + for envelope_name, telemetry_type in _TYPE_MAP.items() + } + +_BASE_TYPE_MAP = EnvelopeTypeMapper.get_type_map() + class TestCustomerStatsbeat(unittest.TestCase): """Tests for CustomerStatsbeatMetrics.""" def setUp(self): @@ -70,7 +84,7 @@ def tearDown(self): pass # Ignore shutdown errors CustomerStatsbeatMetrics._instance = None - def test_successful_item_count(self): + def test_successful_items_count(self): successful_dependencies = 0 metrics = self.mock_options.metrics @@ -90,7 +104,7 @@ def patched_transmit(self_exporter, envelopes): if base_data and hasattr(base_data, "success"): if base_data.success: envelope_name = "Microsoft.ApplicationInsights." + envelope.data.base_type - metrics.count_successful_items(1, _TYPE_MAP.get(envelope_name, _UNKNOWN)) + metrics.count_successful_items(1, _BASE_TYPE_MAP.get(envelope_name, _UNKNOWN)) return ExportResult.SUCCESS @@ -121,7 +135,7 @@ def patched_transmit(self_exporter, envelopes): "db.operation": "query", "net.peer.name": "test-db-server", "net.peer.port": 3306, - "http.status_code": 200 if success else random.choice([400, 500, 503]) + "http.status_code": 200 if success else random.choice([401, 402, 403, 405, 500, 503]) } ) as span: span.set_status(trace.Status( @@ -142,3 +156,165 @@ def patched_transmit(self_exporter, envelopes): successful_dependencies, f"Expected {successful_dependencies} successful dependencies, got {actual_count}" ) + + def test_dropped_items_count(self): + dropped_items = 0 + + metrics = self.mock_options.metrics + metrics._counters.total_item_drop_count.clear() + + exporter = AzureMonitorTraceExporter(connection_string=self.mock_options.connection_string) + + def patched_transmit(self_exporter, envelopes): + self.mock_options.transmit_called[0] = True + + for envelope in envelopes: + if not hasattr(envelope, "data") or not envelope.data: + continue + + envelope_name = "Microsoft.ApplicationInsights." + envelope.data.base_type + telemetry_type = _BASE_TYPE_MAP.get(envelope_name, _UNKNOWN) + + should_fail = random.choice([True, False]) + if should_fail: + nonlocal dropped_items + dropped_items += 1 + + from azure.monitor.opentelemetry.exporter._constants import DropCode + from azure.monitor.opentelemetry.exporter._utils import categorize_exception_message, categorize_status_code + + # Randomly choose between HTTP status code failures and client exceptions + failure_type = random.choice(["http_status", "client_exception"]) + + if failure_type == "http_status": + # HTTP status code failures from Application Insights + status_codes = [400, 401, 402, 403, 404, 408, 429, 500, 502, 503, 504] + status_code = random.choice(status_codes) + + categorized_reason = categorize_status_code(status_code) + + # Use the status code directly as the drop code (integer) + metrics.count_dropped_items(1, telemetry_type, status_code, None) + else: + # Client exception scenarios + exception_scenarios = [ + "Connection timed out after 30 seconds", + "Request timed out after 60 seconds", # Another timeout - should aggregate + "Operation timed out", # Another timeout - should aggregate + "Network connection failed: Connection refused", + "Network error: Host unreachable", # Another network error - should aggregate + "Authentication failed: Invalid credentials", + "Auth error: Token expired", # Another auth error - should aggregate + "Failed to parse response: Invalid JSON format", + "Parse error: Malformed XML", # Another parse error - should aggregate + "Disk storage full: Cannot write to file", + "Out of memory: Cannot allocate buffer", + "Memory allocation failed", # Another memory error - should aggregate + "HTTP 401 Unauthorized", + "HTTP 401 Invalid token", # Another 401 - should aggregate + "HTTP 403 Forbidden", + "HTTP 500 Internal Server Error", + "HTTP 500 Database error", # Another 500 - should aggregate + "HTTP 503 Service Unavailable", + "Unknown transmission error", + "Unexpected error occurred" # Another unknown error - should aggregate + ] + + exception_message = random.choice(exception_scenarios) + + if "HTTP" in exception_message and any(char.isdigit() for char in exception_message): + try: + status_code = int(''.join(filter(str.isdigit, exception_message))) + categorized_reason = categorize_status_code(status_code) + except ValueError: + categorized_reason = categorize_exception_message(exception_message) + else: + categorized_reason = categorize_exception_message(exception_message) + + + metrics.count_dropped_items(1, telemetry_type, DropCode.CLIENT_EXCEPTION, exception_message) + + continue + + return ExportResult.SUCCESS + + exporter._transmit = types.MethodType(patched_transmit, exporter) + + resource = Resource.create({"service.name": "exception-test", "service.instance.id": "test-instance"}) + trace_provider = TracerProvider(resource=resource) + + processor = SimpleSpanProcessor(exporter) + trace_provider.add_span_processor(processor) + + tracer = trace_provider.get_tracer(__name__) + + total_items = random.randint(10, 20) + + for i in range(total_items): + span_type = random.choice(["client", "server"]) + + if span_type == "client": + # Client spans generate RemoteDependencyData + with tracer.start_as_current_span( + name=f"dependency-{i}", + kind=trace.SpanKind.CLIENT, + attributes={ + "db.system": "mysql", + "db.name": "test_db", + "db.operation": "query", + "net.peer.name": "test-db-server", + "net.peer.port": 3306, + } + ) as span: + span.set_status(trace.Status(trace.StatusCode.OK)) + time.sleep(0.01) + else: + # Server spans generate RequestData + with tracer.start_as_current_span( + name=f"GET /api/endpoint-{i}", + kind=trace.SpanKind.SERVER, + attributes={ + "http.method": "GET", + "http.url": f"https://example.com/api/endpoint-{i}", + "http.route": f"/api/endpoint-{i}", + "http.status_code": 200, + "http.scheme": "https", + "http.host": "example.com", + } + ) as span: + span.set_status(trace.Status(trace.StatusCode.OK)) + time.sleep(0.01) + + trace_provider.force_flush() + + self.metrics_instance = metrics + + self.assertTrue(self.mock_options.transmit_called[0], "Exporter _transmit method was not called") + + actual_dropped_count = 0 + + + category_totals = {} + http_status_totals = {} + client_exception_totals = {} + + for telemetry_type, drop_code_data in metrics._counters.total_item_drop_count.items(): + for drop_code, reason_map in drop_code_data.items(): + if isinstance(reason_map, dict): + for reason, count in reason_map.items(): + actual_dropped_count += count + category_totals[reason] = category_totals.get(reason, 0) + count + + # Separate HTTP status codes from client exceptions + if isinstance(drop_code, int): + http_status_totals[reason] = http_status_totals.get(reason, 0) + count + else: + client_exception_totals[reason] = client_exception_totals.get(reason, 0) + count + else: + actual_dropped_count += reason_map + + self.assertEqual( + actual_dropped_count, + dropped_items, + f"Expected {dropped_items} dropped items, got {actual_dropped_count}" + ) \ No newline at end of file From 0b3028bc55778a834cd42824e8adedc232d0bb59 Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Wed, 9 Jul 2025 13:08:51 -0700 Subject: [PATCH 2/3] Addressed review comments --- .../opentelemetry/exporter/_constants.py | 4 +- .../monitor/opentelemetry/exporter/_utils.py | 38 ------ .../exporter/statsbeat/_customer_statsbeat.py | 18 +-- .../exporter/statsbeat/_utils.py | 23 ++++ .../statsbeat/test_customer_statsbeat.py | 121 ++++++++++++------ 5 files changed, 115 insertions(+), 89 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py index 88168a1e7602..95f115a29ccf 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py @@ -3,6 +3,7 @@ # cSpell:disable from enum import Enum +from typing import Union from opentelemetry.semconv.metrics import MetricInstruments from opentelemetry.semconv.metrics.http_metrics import ( HTTP_CLIENT_REQUEST_DURATION, @@ -10,7 +11,6 @@ ) from azure.core import CaseInsensitiveEnumMeta - # Environment variables _APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL = "APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL" @@ -145,6 +145,8 @@ class DropCode(str, Enum, metaclass=CaseInsensitiveEnumMeta): CLIENT_PERSISTENCE_CAPACITY = "CLIENT_PERSISTENCE_CAPACITY" UNKNOWN = "UNKNOWN" +DropCodeType = Union[DropCode, int] + class RetryCode(str, Enum, metaclass=CaseInsensitiveEnumMeta): CLIENT_TIMEOUT = "CLIENT_TIMEOUT" UNKNOWN = "UNKNOWN" diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py index d693395b9ac2..f2570b88120e 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py @@ -400,41 +400,3 @@ def get_compute_type(): if _is_on_aks(): return _RP_Names.AKS.value return _RP_Names.UNKNOWN.value - -def categorize_exception_message(exception_message: str) -> str: - message = exception_message.lower() - exception_categories = { - "timeout_exception": ["timeout", "timed out"], - "network_exception": ["network", "connection"], - "auth_exception": ["auth", "authentication", "forbidden"], - "parse_exception": ["parse", "parsing", "invalid"], - "storage_exception": ["disk", "storage", "file"], - "memory_exception": ["memory", "out of memory"], - } - for category, keywords in exception_categories.items(): - if any(keyword in message for keyword in keywords): - return category - return "other_exception" - -def categorize_status_code(status_code: int) -> str: - status_map = { - 400: "bad_request", - 401: "unauthorized", - 402: "daily quota exceeded", - 403: "forbidden", - 404: "not_found", - 408: "request_timeout", - 413: "payload_too_large", - 429: "too_many_requests", - 500: "internal_server_error", - 502: "bad_gateway", - 503: "service_unavailable", - 504: "gateway_timeout", - } - if status_code in status_map: - return status_map[status_code] - if 400 <= status_code < 500: - return "client_error_4xx" - if 500 <= status_code < 600: - return "server_error_5xx" - return f"status_{status_code}" diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py index 8e2ae77678bc..d0203b42c87e 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py @@ -18,6 +18,7 @@ _DEFAULT_STATS_SHORT_EXPORT_INTERVAL, CustomerStatsbeatProperties, DropCode, + DropCodeType, #RetryCode, CustomerStatsbeatMetricName, _CUSTOMER_STATSBEAT_LANGUAGE, @@ -27,16 +28,17 @@ from azure.monitor.opentelemetry.exporter._utils import ( Singleton, get_compute_type, - categorize_status_code, - categorize_exception_message, ) +from azure.monitor.opentelemetry.exporter.statsbeat._utils import ( + categorize_status_code, +) from azure.monitor.opentelemetry.exporter import VERSION class _CustomerStatsbeatTelemetryCounters: def __init__(self): self.total_item_success_count: Dict[str, Any] = {} - self.total_item_drop_count: Dict[str, Dict[str, Dict[str, int]]] = {} + self.total_item_drop_count: Dict[str, Dict[DropCodeType, Dict[str, int]]] = {} class CustomerStatsbeatMetrics(metaclass=Singleton): def __init__(self, options): @@ -85,7 +87,7 @@ def count_successful_items(self, count: int, telemetry_type: str) -> None: self._counters.total_item_success_count[telemetry_type] = count def count_dropped_items( - self, count: int, telemetry_type: str, drop_code: DropCode, + self, count: int, telemetry_type: str, drop_code: DropCodeType, exception_message: Optional[str] = None ) -> None: if not self._is_enabled or count <= 0: @@ -106,7 +108,7 @@ def count_dropped_items( # Update the count for this reason current_count = reason_map.get(reason, 0) - reason_map[reason] = current_count + 1 + reason_map[reason] = current_count + count def _item_success_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument if not getattr(self, "_is_enabled", False): @@ -136,7 +138,7 @@ def _item_drop_callback(self, options: CallbackOptions) -> Iterable[Observation] "language": self._customer_properties.language, "version": self._customer_properties.version, "compute_type": self._customer_properties.compute_type, - "drop.code": str(drop_code), + "drop.code": drop_code, "drop.reason": reason, "telemetry_type": telemetry_type } @@ -144,12 +146,12 @@ def _item_drop_callback(self, options: CallbackOptions) -> Iterable[Observation] return observations - def _get_drop_reason(self, drop_code: DropCode, exception_message: Optional[str] = None) -> str: + def _get_drop_reason(self, drop_code: DropCodeType, exception_message: Optional[str] = None) -> str: if isinstance(drop_code, int): return categorize_status_code(drop_code) if drop_code == DropCode.CLIENT_EXCEPTION: - return categorize_exception_message(exception_message) if exception_message else "unknown_exception" + return exception_message if exception_message else "unknown_exception" drop_code_reasons = { DropCode.CLIENT_READONLY: "readonly_mode", diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py index d1607c125aa6..8a982947a4a8 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py @@ -67,3 +67,26 @@ def _update_requests_map(type_name, value): else: _REQUESTS_MAP[type_name] = {} _REQUESTS_MAP[type_name][value] = prev + 1 + +def categorize_status_code(status_code: int) -> str: + status_map = { + 400: "bad_request", + 401: "unauthorized", + 402: "daily quota exceeded", + 403: "forbidden", + 404: "not_found", + 408: "request_timeout", + 413: "payload_too_large", + 429: "too_many_requests", + 500: "internal_server_error", + 502: "bad_gateway", + 503: "service_unavailable", + 504: "gateway_timeout", + } + if status_code in status_map: + return status_map[status_code] + if 400 <= status_code < 500: + return "client_error_4xx" + if 500 <= status_code < 600: + return "server_error_5xx" + return f"status_{status_code}" diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py index d09edb54d332..67885bdb4551 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py @@ -21,6 +21,9 @@ _DEFAULT_STATS_SHORT_EXPORT_INTERVAL, _UNKNOWN, _TYPE_MAP, + DropCode, + DropCodeType, + _TRACE, ) from opentelemetry import trace @@ -33,6 +36,9 @@ ) from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter from azure.monitor.opentelemetry.exporter.statsbeat._state import _REQUESTS_MAP +from azure.monitor.opentelemetry.exporter.statsbeat._utils import ( + categorize_status_code, +) def convert_envelope_names_to_base_type(envelope_name): if not envelope_name.endswith("Data"): @@ -84,6 +90,23 @@ def tearDown(self): pass # Ignore shutdown errors CustomerStatsbeatMetrics._instance = None + def test_customer_statsbeat_not_initialized_when_disabled(self): + CustomerStatsbeatMetrics._instance = None + + with mock.patch.dict(os.environ, {"APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW": "false"}): + metrics = CustomerStatsbeatMetrics(self.mock_options) + + # Verify is_enabled flag is False + self.assertFalse(metrics._is_enabled) + + # Verify the metrics methods don't do anything when disabled + metrics.count_successful_items(5, _REQUEST) + metrics.count_dropped_items(3, _REQUEST, DropCode.CLIENT_EXCEPTION, "Test exception") + + # Verify callbacks return empty lists when disabled + self.assertEqual(metrics._item_success_callback(mock.Mock()), []) + self.assertEqual(metrics._item_drop_callback(mock.Mock()), []) + def test_successful_items_count(self): successful_dependencies = 0 @@ -118,7 +141,6 @@ def patched_transmit(self_exporter, envelopes): tracer = trace_provider.get_tracer(__name__) - # Generate random number of dependencies (between 5 and 15) total_dependencies = random.randint(5, 15) for i in range(total_dependencies): @@ -171,68 +193,66 @@ def patched_transmit(self_exporter, envelopes): for envelope in envelopes: if not hasattr(envelope, "data") or not envelope.data: continue - + envelope_name = "Microsoft.ApplicationInsights." + envelope.data.base_type telemetry_type = _BASE_TYPE_MAP.get(envelope_name, _UNKNOWN) should_fail = random.choice([True, False]) if should_fail: nonlocal dropped_items - dropped_items += 1 - from azure.monitor.opentelemetry.exporter._constants import DropCode - from azure.monitor.opentelemetry.exporter._utils import categorize_exception_message, categorize_status_code - - # Randomly choose between HTTP status code failures and client exceptions failure_type = random.choice(["http_status", "client_exception"]) if failure_type == "http_status": - # HTTP status code failures from Application Insights - status_codes = [400, 401, 402, 403, 404, 408, 429, 500, 502, 503, 504] + status_codes = [401, 401, 403, 500, 500, 503, 402] status_code = random.choice(status_codes) - categorized_reason = categorize_status_code(status_code) + failure_count = random.randint(1, 3) + dropped_items += failure_count - # Use the status code directly as the drop code (integer) - metrics.count_dropped_items(1, telemetry_type, status_code, None) + metrics.count_dropped_items(failure_count, telemetry_type, status_code, None) else: - # Client exception scenarios exception_scenarios = [ + "timeout_exception" "Connection timed out after 30 seconds", - "Request timed out after 60 seconds", # Another timeout - should aggregate - "Operation timed out", # Another timeout - should aggregate + "Request timed out after 60 seconds", + "Operation timed out", + + "network_exception", "Network connection failed: Connection refused", - "Network error: Host unreachable", # Another network error - should aggregate - "Authentication failed: Invalid credentials", - "Auth error: Token expired", # Another auth error - should aggregate + "Network error: Host unreachable", + + "authentication_exception", + "Authentication failed: Invalid credentials", + "Auth error: Token expired", + "Failed to parse response: Invalid JSON format", - "Parse error: Malformed XML", # Another parse error - should aggregate - "Disk storage full: Cannot write to file", + "Parse error: Malformed XML", + "parse_exception", + "Out of memory: Cannot allocate buffer", - "Memory allocation failed", # Another memory error - should aggregate + "Memory allocation failed", + "memory_exception", + "HTTP 401 Unauthorized", - "HTTP 401 Invalid token", # Another 401 - should aggregate - "HTTP 403 Forbidden", + "HTTP 401 Invalid token", "HTTP 500 Internal Server Error", - "HTTP 500 Database error", # Another 500 - should aggregate - "HTTP 503 Service Unavailable", + "HTTP 500 Database error", + "Unknown transmission error", - "Unexpected error occurred" # Another unknown error - should aggregate + "Unexpected error occurred" + + "storage_exception", + "other_exception" ] exception_message = random.choice(exception_scenarios) - if "HTTP" in exception_message and any(char.isdigit() for char in exception_message): - try: - status_code = int(''.join(filter(str.isdigit, exception_message))) - categorized_reason = categorize_status_code(status_code) - except ValueError: - categorized_reason = categorize_exception_message(exception_message) - else: - categorized_reason = categorize_exception_message(exception_message) - + # Simulate multiple failures for the same exception type + failure_count = random.randint(1, 4) + dropped_items += failure_count - metrics.count_dropped_items(1, telemetry_type, DropCode.CLIENT_EXCEPTION, exception_message) + metrics.count_dropped_items(failure_count, telemetry_type, DropCode.CLIENT_EXCEPTION, exception_message) continue @@ -248,7 +268,7 @@ def patched_transmit(self_exporter, envelopes): tracer = trace_provider.get_tracer(__name__) - total_items = random.randint(10, 20) + total_items = random.randint(15, 25) # Increased to get more aggregation for i in range(total_items): span_type = random.choice(["client", "server"]) @@ -291,9 +311,8 @@ def patched_transmit(self_exporter, envelopes): self.assertTrue(self.mock_options.transmit_called[0], "Exporter _transmit method was not called") + # Enhanced counting and verification logic actual_dropped_count = 0 - - category_totals = {} http_status_totals = {} client_exception_totals = {} @@ -308,13 +327,31 @@ def patched_transmit(self_exporter, envelopes): # Separate HTTP status codes from client exceptions if isinstance(drop_code, int): http_status_totals[reason] = http_status_totals.get(reason, 0) + count - else: + elif isinstance(drop_code, DropCode): client_exception_totals[reason] = client_exception_totals.get(reason, 0) + count else: actual_dropped_count += reason_map + # Test that some categories have counts > 1 (proving aggregation works) + aggregated_categories = [cat for cat, count in category_totals.items() if count > 1] + + # Main assertion self.assertEqual( actual_dropped_count, dropped_items, - f"Expected {dropped_items} dropped items, got {actual_dropped_count}" - ) \ No newline at end of file + f"Expected {dropped_items} dropped items, got {actual_dropped_count}. " + f"HTTP Status drops: {len(http_status_totals)}, Client Exception drops: {len(client_exception_totals)}" + ) + + # Verify aggregation occurred + self.assertGreater(len(http_status_totals) + len(client_exception_totals), 0, + "At least one type of drop should have occurred") + + # Verify that both integer and enum drop codes are being stored properly + drop_code_types = set() + for telemetry_type, drop_code_data in metrics._counters.total_item_drop_count.items(): + for drop_code in drop_code_data.keys(): + drop_code_types.add(type(drop_code).__name__) + + # Additional assertion to verify aggregation works + multi_count_categories = [cat for cat, count in category_totals.items() if count > 1] \ No newline at end of file From 0dec3170a71f153ae17ea37cdcc5f37dd37104c5 Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Wed, 9 Jul 2025 14:18:18 -0700 Subject: [PATCH 3/3] Updated CHANGELOG --- sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 5eedab9ec035..eb6f38b5fb68 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -8,6 +8,8 @@ ([#41733](https://github.com/Azure/azure-sdk-for-python/pull/41733)) - Added customer-facing statsbeat preview. ([#41669](https://github.com/Azure/azure-sdk-for-python/pull/41669)) +- Customer Facing Statsbeat: Added logic for dropped item count + ([#41950](https://github.com/Azure/azure-sdk-for-python/pull/41950)) ### Breaking Changes