diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 5eedab9ec035..eb6f38b5fb68 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -8,6 +8,8 @@ ([#41733](https://github.com/Azure/azure-sdk-for-python/pull/41733)) - Added customer-facing statsbeat preview. ([#41669](https://github.com/Azure/azure-sdk-for-python/pull/41669)) +- Customer Facing Statsbeat: Added logic for dropped item count + ([#41950](https://github.com/Azure/azure-sdk-for-python/pull/41950)) ### Breaking Changes diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py index 55e32cb6c984..95f115a29ccf 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py @@ -3,6 +3,7 @@ # cSpell:disable from enum import Enum +from typing import Union from opentelemetry.semconv.metrics import MetricInstruments from opentelemetry.semconv.metrics.http_metrics import ( HTTP_CLIENT_REQUEST_DURATION, @@ -64,7 +65,6 @@ _MESSAGE_ENVELOPE_NAME = "Microsoft.ApplicationInsights.Message" _REQUEST_ENVELOPE_NAME = "Microsoft.ApplicationInsights.Request" _REMOTE_DEPENDENCY_ENVELOPE_NAME = "Microsoft.ApplicationInsights.RemoteDependency" -_REMOTE_DEPENDENCY_ENVELOPE_DATA = "Microsoft.ApplicationInsights.RemoteDependencyData" _EVENT_ENVELOPE_NAME = "Microsoft.ApplicationInsights.Event" _PAGE_VIEW_ENVELOPE_NAME = "Microsoft.ApplicationInsights.PageView" _PERFORMANCE_COUNTER_ENVELOPE_NAME = "Microsoft.ApplicationInsights.PerformanceCounter" @@ -145,6 +145,8 @@ class DropCode(str, Enum, metaclass=CaseInsensitiveEnumMeta): CLIENT_PERSISTENCE_CAPACITY = "CLIENT_PERSISTENCE_CAPACITY" UNKNOWN = "UNKNOWN" +DropCodeType = Union[DropCode, int] + class RetryCode(str, Enum, metaclass=CaseInsensitiveEnumMeta): CLIENT_TIMEOUT = "CLIENT_TIMEOUT" UNKNOWN = "UNKNOWN" @@ -167,7 +169,7 @@ def __init__(self, language: str, version: str, compute_type: str): _TYPE_MAP = { _EVENT_ENVELOPE_NAME: _CUSTOM_EVENT, _METRIC_ENVELOPE_NAME: _CUSTOM_METRIC, - _REMOTE_DEPENDENCY_ENVELOPE_DATA: _DEPENDENCY, + _REMOTE_DEPENDENCY_ENVELOPE_NAME: _DEPENDENCY, _EXCEPTION_ENVELOPE_NAME: _EXCEPTION, _PAGE_VIEW_ENVELOPE_NAME: _PAGE_VIEW, _MESSAGE_ENVELOPE_NAME: _TRACE, diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py index 2dd23630de7e..d0203b42c87e 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py @@ -6,7 +6,7 @@ metrics that track the usage and performance of the Azure Monitor OpenTelemetry Exporter. """ -from typing import List, Dict, Any, Iterable +from typing import List, Dict, Any, Iterable, Optional import os from opentelemetry.metrics import CallbackOptions, Observation @@ -17,7 +17,8 @@ _APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW, _DEFAULT_STATS_SHORT_EXPORT_INTERVAL, CustomerStatsbeatProperties, - #DropCode, + DropCode, + DropCodeType, #RetryCode, CustomerStatsbeatMetricName, _CUSTOMER_STATSBEAT_LANGUAGE, @@ -29,11 +30,15 @@ get_compute_type, ) +from azure.monitor.opentelemetry.exporter.statsbeat._utils import ( + categorize_status_code, +) from azure.monitor.opentelemetry.exporter import VERSION class _CustomerStatsbeatTelemetryCounters: def __init__(self): self.total_item_success_count: Dict[str, Any] = {} + self.total_item_drop_count: Dict[str, Dict[DropCodeType, Dict[str, int]]] = {} class CustomerStatsbeatMetrics(metaclass=Singleton): def __init__(self, options): @@ -66,20 +71,51 @@ def __init__(self, options): description="Tracks successful telemetry items sent to Azure Monitor", callbacks=[self._item_success_callback] ) + self._dropped_gauge = self._customer_statsbeat_meter.create_observable_gauge( + name=CustomerStatsbeatMetricName.ITEM_DROP_COUNT.value, + description="Tracks dropped telemetry items sent to Azure Monitor", + callbacks=[self._item_drop_callback] + ) def count_successful_items(self, count: int, telemetry_type: str) -> None: if not self._is_enabled or count <= 0: return + if telemetry_type in self._counters.total_item_success_count: self._counters.total_item_success_count[telemetry_type] += count else: self._counters.total_item_success_count[telemetry_type] = count + def count_dropped_items( + self, count: int, telemetry_type: str, drop_code: DropCodeType, + exception_message: Optional[str] = None + ) -> None: + if not self._is_enabled or count <= 0: + return + + # Get or create the drop_code map for this telemetry_type + if telemetry_type not in self._counters.total_item_drop_count: + self._counters.total_item_drop_count[telemetry_type] = {} + drop_code_map = self._counters.total_item_drop_count[telemetry_type] + + # Get or create the reason map for this drop_code + if drop_code not in drop_code_map: + drop_code_map[drop_code] = {} + reason_map = drop_code_map[drop_code] + + # Generate a low-cardinality, informative reason description + reason = self._get_drop_reason(drop_code, exception_message) + + # Update the count for this reason + current_count = reason_map.get(reason, 0) + reason_map[reason] = current_count + count + def _item_success_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument if not getattr(self, "_is_enabled", False): return [] observations: List[Observation] = [] + for telemetry_type, count in self._counters.total_item_success_count.items(): attributes = { "language": self._customer_properties.language, @@ -90,3 +126,37 @@ def _item_success_callback(self, options: CallbackOptions) -> Iterable[Observati observations.append(Observation(count, dict(attributes))) return observations + + def _item_drop_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument + if not getattr(self, "_is_enabled", False): + return [] + observations: List[Observation] = [] + for telemetry_type, drop_code_map in self._counters.total_item_drop_count.items(): + for drop_code, reason_map in drop_code_map.items(): + for reason, count in reason_map.items(): + attributes = { + "language": self._customer_properties.language, + "version": self._customer_properties.version, + "compute_type": self._customer_properties.compute_type, + "drop.code": drop_code, + "drop.reason": reason, + "telemetry_type": telemetry_type + } + observations.append(Observation(count, dict(attributes))) + + return observations + + def _get_drop_reason(self, drop_code: DropCodeType, exception_message: Optional[str] = None) -> str: + if isinstance(drop_code, int): + return categorize_status_code(drop_code) + + if drop_code == DropCode.CLIENT_EXCEPTION: + return exception_message if exception_message else "unknown_exception" + + drop_code_reasons = { + DropCode.CLIENT_READONLY: "readonly_mode", + DropCode.CLIENT_STALE_DATA: "stale_data", + DropCode.CLIENT_PERSISTENCE_CAPACITY: "persistence_full", + } + + return drop_code_reasons.get(drop_code, "unknown_reason") diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py index d1607c125aa6..8a982947a4a8 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py @@ -67,3 +67,26 @@ def _update_requests_map(type_name, value): else: _REQUESTS_MAP[type_name] = {} _REQUESTS_MAP[type_name][value] = prev + 1 + +def categorize_status_code(status_code: int) -> str: + status_map = { + 400: "bad_request", + 401: "unauthorized", + 402: "daily quota exceeded", + 403: "forbidden", + 404: "not_found", + 408: "request_timeout", + 413: "payload_too_large", + 429: "too_many_requests", + 500: "internal_server_error", + 502: "bad_gateway", + 503: "service_unavailable", + 504: "gateway_timeout", + } + if status_code in status_map: + return status_map[status_code] + if 400 <= status_code < 500: + return "client_error_4xx" + if 500 <= status_code < 600: + return "server_error_5xx" + return f"status_{status_code}" diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py index 7f4c1683366d..67885bdb4551 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py @@ -21,6 +21,9 @@ _DEFAULT_STATS_SHORT_EXPORT_INTERVAL, _UNKNOWN, _TYPE_MAP, + DropCode, + DropCodeType, + _TRACE, ) from opentelemetry import trace @@ -33,6 +36,23 @@ ) from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter from azure.monitor.opentelemetry.exporter.statsbeat._state import _REQUESTS_MAP +from azure.monitor.opentelemetry.exporter.statsbeat._utils import ( + categorize_status_code, +) + +def convert_envelope_names_to_base_type(envelope_name): + if not envelope_name.endswith("Data"): + return envelope_name + "Data" + return envelope_name +class EnvelopeTypeMapper: + @classmethod + def get_type_map(cls): + return { + convert_envelope_names_to_base_type(envelope_name): telemetry_type + for envelope_name, telemetry_type in _TYPE_MAP.items() + } + +_BASE_TYPE_MAP = EnvelopeTypeMapper.get_type_map() class TestCustomerStatsbeat(unittest.TestCase): """Tests for CustomerStatsbeatMetrics.""" @@ -70,7 +90,24 @@ def tearDown(self): pass # Ignore shutdown errors CustomerStatsbeatMetrics._instance = None - def test_successful_item_count(self): + def test_customer_statsbeat_not_initialized_when_disabled(self): + CustomerStatsbeatMetrics._instance = None + + with mock.patch.dict(os.environ, {"APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW": "false"}): + metrics = CustomerStatsbeatMetrics(self.mock_options) + + # Verify is_enabled flag is False + self.assertFalse(metrics._is_enabled) + + # Verify the metrics methods don't do anything when disabled + metrics.count_successful_items(5, _REQUEST) + metrics.count_dropped_items(3, _REQUEST, DropCode.CLIENT_EXCEPTION, "Test exception") + + # Verify callbacks return empty lists when disabled + self.assertEqual(metrics._item_success_callback(mock.Mock()), []) + self.assertEqual(metrics._item_drop_callback(mock.Mock()), []) + + def test_successful_items_count(self): successful_dependencies = 0 metrics = self.mock_options.metrics @@ -90,7 +127,7 @@ def patched_transmit(self_exporter, envelopes): if base_data and hasattr(base_data, "success"): if base_data.success: envelope_name = "Microsoft.ApplicationInsights." + envelope.data.base_type - metrics.count_successful_items(1, _TYPE_MAP.get(envelope_name, _UNKNOWN)) + metrics.count_successful_items(1, _BASE_TYPE_MAP.get(envelope_name, _UNKNOWN)) return ExportResult.SUCCESS @@ -104,7 +141,6 @@ def patched_transmit(self_exporter, envelopes): tracer = trace_provider.get_tracer(__name__) - # Generate random number of dependencies (between 5 and 15) total_dependencies = random.randint(5, 15) for i in range(total_dependencies): @@ -121,7 +157,7 @@ def patched_transmit(self_exporter, envelopes): "db.operation": "query", "net.peer.name": "test-db-server", "net.peer.port": 3306, - "http.status_code": 200 if success else random.choice([400, 500, 503]) + "http.status_code": 200 if success else random.choice([401, 402, 403, 405, 500, 503]) } ) as span: span.set_status(trace.Status( @@ -142,3 +178,180 @@ def patched_transmit(self_exporter, envelopes): successful_dependencies, f"Expected {successful_dependencies} successful dependencies, got {actual_count}" ) + + def test_dropped_items_count(self): + dropped_items = 0 + + metrics = self.mock_options.metrics + metrics._counters.total_item_drop_count.clear() + + exporter = AzureMonitorTraceExporter(connection_string=self.mock_options.connection_string) + + def patched_transmit(self_exporter, envelopes): + self.mock_options.transmit_called[0] = True + + for envelope in envelopes: + if not hasattr(envelope, "data") or not envelope.data: + continue + + envelope_name = "Microsoft.ApplicationInsights." + envelope.data.base_type + telemetry_type = _BASE_TYPE_MAP.get(envelope_name, _UNKNOWN) + + should_fail = random.choice([True, False]) + if should_fail: + nonlocal dropped_items + + failure_type = random.choice(["http_status", "client_exception"]) + + if failure_type == "http_status": + status_codes = [401, 401, 403, 500, 500, 503, 402] + status_code = random.choice(status_codes) + + failure_count = random.randint(1, 3) + dropped_items += failure_count + + metrics.count_dropped_items(failure_count, telemetry_type, status_code, None) + else: + exception_scenarios = [ + "timeout_exception" + "Connection timed out after 30 seconds", + "Request timed out after 60 seconds", + "Operation timed out", + + "network_exception", + "Network connection failed: Connection refused", + "Network error: Host unreachable", + + "authentication_exception", + "Authentication failed: Invalid credentials", + "Auth error: Token expired", + + "Failed to parse response: Invalid JSON format", + "Parse error: Malformed XML", + "parse_exception", + + "Out of memory: Cannot allocate buffer", + "Memory allocation failed", + "memory_exception", + + "HTTP 401 Unauthorized", + "HTTP 401 Invalid token", + "HTTP 500 Internal Server Error", + "HTTP 500 Database error", + + "Unknown transmission error", + "Unexpected error occurred" + + "storage_exception", + "other_exception" + ] + + exception_message = random.choice(exception_scenarios) + + # Simulate multiple failures for the same exception type + failure_count = random.randint(1, 4) + dropped_items += failure_count + + metrics.count_dropped_items(failure_count, telemetry_type, DropCode.CLIENT_EXCEPTION, exception_message) + + continue + + return ExportResult.SUCCESS + + exporter._transmit = types.MethodType(patched_transmit, exporter) + + resource = Resource.create({"service.name": "exception-test", "service.instance.id": "test-instance"}) + trace_provider = TracerProvider(resource=resource) + + processor = SimpleSpanProcessor(exporter) + trace_provider.add_span_processor(processor) + + tracer = trace_provider.get_tracer(__name__) + + total_items = random.randint(15, 25) # Increased to get more aggregation + + for i in range(total_items): + span_type = random.choice(["client", "server"]) + + if span_type == "client": + # Client spans generate RemoteDependencyData + with tracer.start_as_current_span( + name=f"dependency-{i}", + kind=trace.SpanKind.CLIENT, + attributes={ + "db.system": "mysql", + "db.name": "test_db", + "db.operation": "query", + "net.peer.name": "test-db-server", + "net.peer.port": 3306, + } + ) as span: + span.set_status(trace.Status(trace.StatusCode.OK)) + time.sleep(0.01) + else: + # Server spans generate RequestData + with tracer.start_as_current_span( + name=f"GET /api/endpoint-{i}", + kind=trace.SpanKind.SERVER, + attributes={ + "http.method": "GET", + "http.url": f"https://example.com/api/endpoint-{i}", + "http.route": f"/api/endpoint-{i}", + "http.status_code": 200, + "http.scheme": "https", + "http.host": "example.com", + } + ) as span: + span.set_status(trace.Status(trace.StatusCode.OK)) + time.sleep(0.01) + + trace_provider.force_flush() + + self.metrics_instance = metrics + + self.assertTrue(self.mock_options.transmit_called[0], "Exporter _transmit method was not called") + + # Enhanced counting and verification logic + actual_dropped_count = 0 + category_totals = {} + http_status_totals = {} + client_exception_totals = {} + + for telemetry_type, drop_code_data in metrics._counters.total_item_drop_count.items(): + for drop_code, reason_map in drop_code_data.items(): + if isinstance(reason_map, dict): + for reason, count in reason_map.items(): + actual_dropped_count += count + category_totals[reason] = category_totals.get(reason, 0) + count + + # Separate HTTP status codes from client exceptions + if isinstance(drop_code, int): + http_status_totals[reason] = http_status_totals.get(reason, 0) + count + elif isinstance(drop_code, DropCode): + client_exception_totals[reason] = client_exception_totals.get(reason, 0) + count + else: + actual_dropped_count += reason_map + + # Test that some categories have counts > 1 (proving aggregation works) + aggregated_categories = [cat for cat, count in category_totals.items() if count > 1] + + # Main assertion + self.assertEqual( + actual_dropped_count, + dropped_items, + f"Expected {dropped_items} dropped items, got {actual_dropped_count}. " + f"HTTP Status drops: {len(http_status_totals)}, Client Exception drops: {len(client_exception_totals)}" + ) + + # Verify aggregation occurred + self.assertGreater(len(http_status_totals) + len(client_exception_totals), 0, + "At least one type of drop should have occurred") + + # Verify that both integer and enum drop codes are being stored properly + drop_code_types = set() + for telemetry_type, drop_code_data in metrics._counters.total_item_drop_count.items(): + for drop_code in drop_code_data.keys(): + drop_code_types.add(type(drop_code).__name__) + + # Additional assertion to verify aggregation works + multi_count_categories = [cat for cat, count in category_totals.items() if count > 1] \ No newline at end of file