From c127567b1f2aa690ea47a7c81849575f85da27ee Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Wed, 9 Jul 2025 07:42:30 -0700 Subject: [PATCH 1/5] Added changes for rate limited sampler azure-exporter --- .../opentelemetry/exporter/__init__.py | 2 + .../opentelemetry/exporter/_constants.py | 3 + .../export/trace/_rate_limited_sampling.py | 160 ++++++ .../exporter/export/trace/_utils.py | 36 ++ .../tests/trace/test_rate_limited_sampling.py | 489 ++++++++++++++++++ 5 files changed, 690 insertions(+) create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py index feaa0fec8584..ffad52399d8d 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py @@ -8,10 +8,12 @@ from azure.monitor.opentelemetry.exporter.export.metrics._exporter import AzureMonitorMetricExporter from azure.monitor.opentelemetry.exporter.export.trace._exporter import AzureMonitorTraceExporter from azure.monitor.opentelemetry.exporter.export.trace._sampling import ApplicationInsightsSampler +from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import RateLimitedSampler from ._version import VERSION __all__ = [ "ApplicationInsightsSampler", + "RateLimitedSampler", "AzureMonitorMetricExporter", "AzureMonitorLogExporter", "AzureMonitorTraceExporter", diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py index 55e32cb6c984..9da99d600136 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py @@ -287,6 +287,9 @@ class _RP_Names(Enum): # sampleRate _SAMPLE_RATE_KEY = "_MS.sampleRate" +_HASH = 5381 +_INTEGER_MAX: int = 2**31 - 1 +_INTEGER_MIN: int = -2**31 # AAD Auth diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py new file mode 100644 index 000000000000..e7f05624b5f7 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py @@ -0,0 +1,160 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import math +import threading +import time +from typing import Callable, Optional, Sequence +from opentelemetry.context import Context +from opentelemetry.trace import Link, SpanKind, format_trace_id, get_current_span +from opentelemetry.sdk.trace.sampling import ( + Decision, + Sampler, + SamplingResult, + _get_parent_trace_state, +) +from opentelemetry.trace.span import TraceState +from opentelemetry.util.types import Attributes + +from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY + +from azure.monitor.opentelemetry.exporter.export.trace._utils import _get_djb2_sample_score, _round_down_to_nearest + +class _State: + def __init__(self, effective_window_count: float, effective_window_nanos: float, last_nano_time: int): + self.effective_window_count = effective_window_count + self.effective_window_nanos = effective_window_nanos + self.last_nano_time = last_nano_time + +class RateLimitedSamplingPercentage: + def __init__(self, target_spans_per_second_limit: float, + nano_time_supplier: Optional[Callable[[], int]] = None, round_to_nearest: bool = True): + if target_spans_per_second_limit < 0.0: + raise ValueError("Limit for sampled spans per second must be nonnegative!") + self._nano_time_supplier = nano_time_supplier or (lambda: int(time.time_ns())) + # Hardcoded adaptation time of 0.1 seconds for adjusting to sudden changes in telemetry volumes + adaptation_time_seconds = 0.1 + self._inverse_adaptation_time_nanos = 1e-9 / adaptation_time_seconds + self._target_spans_per_nanosecond_limit = 1e-9 * target_spans_per_second_limit + initial_nano_time = self._nano_time_supplier() + self._state = _State(0.0, 0.0, initial_nano_time) + self._lock = threading.Lock() + self._round_to_nearest = round_to_nearest + + def _update_state(self, old_state: _State, current_nano_time: int) -> _State: + if current_nano_time <= old_state.last_nano_time: + return _State( + old_state.effective_window_count + 1, + old_state.effective_window_nanos, + old_state.last_nano_time + ) + nano_time_delta = current_nano_time - old_state.last_nano_time + decay_factor = math.exp(-nano_time_delta * self._inverse_adaptation_time_nanos) + current_effective_window_count = old_state.effective_window_count * decay_factor + 1 + current_effective_window_nanos = old_state.effective_window_nanos * decay_factor + nano_time_delta + + return _State(current_effective_window_count, current_effective_window_nanos, current_nano_time) + + def get(self) -> float: + """Get the current sampling percentage (0.0 to 100.0).""" + current_nano_time = self._nano_time_supplier() + + with self._lock: + old_state = self._state + self._state = self._update_state(self._state, current_nano_time) + current_state = self._state + + # Calculate sampling probability based on current state + if current_state.effective_window_count == 0: + return 100.0 + + sampling_probability = ( + (current_state.effective_window_nanos * self._target_spans_per_nanosecond_limit) / + current_state.effective_window_count + ) + + sampling_percentage = 100 * min(sampling_probability, 1.0) + + if self._round_to_nearest: + sampling_percentage = _round_down_to_nearest(sampling_percentage) + + return sampling_percentage + + +class RateLimitedSampler(Sampler): + def __init__(self, target_spans_per_second_limit: float): + self._sampling_percentage_generator = RateLimitedSamplingPercentage(target_spans_per_second_limit) + self._description = f"RateLimitedSampler{{{target_spans_per_second_limit}}}" + + def should_sample( + self, + parent_context: Optional[Context], + trace_id: int, + name: str, + kind: Optional[SpanKind] = None, + attributes: Attributes = None, + links: Optional[Sequence["Link"]] = None, + trace_state: Optional["TraceState"] = None, + ) -> "SamplingResult": + + if parent_context is not None: + parent_span = get_current_span(parent_context) + parent_span_context = parent_span.get_span_context() + + # Check if parent is valid and local (not remote) + if parent_span_context.is_valid and not parent_span_context.is_remote: + # Check if parent was dropped/record-only first + if not parent_span.is_recording(): + # Parent was dropped, drop this child too + if attributes is None: + new_attributes = {} + else: + new_attributes = dict(attributes) + new_attributes[_SAMPLE_RATE_KEY] = 0.0 + + return SamplingResult( + Decision.DROP, + new_attributes, + _get_parent_trace_state(parent_context), + ) + + # Parent is recording, check for sample rate attribute + parent_attributes = getattr(parent_span, 'attributes', {}) + parent_sample_rate = parent_attributes.get(_SAMPLE_RATE_KEY) + + if parent_sample_rate is not None: + # Honor parent's sampling rate + if attributes is None: + new_attributes = {} + else: + new_attributes = dict(attributes) + new_attributes[_SAMPLE_RATE_KEY] = parent_sample_rate + + return SamplingResult( + Decision.RECORD_AND_SAMPLE, + new_attributes, + _get_parent_trace_state(parent_context), + ) + + sampling_percentage = self._sampling_percentage_generator.get() + sampling_score = _get_djb2_sample_score(format_trace_id(trace_id).lower()) + + if sampling_score < sampling_percentage: + decision = Decision.RECORD_AND_SAMPLE + else: + decision = Decision.DROP + + if attributes is None: + new_attributes = {} + else: + new_attributes = dict(attributes) + new_attributes[_SAMPLE_RATE_KEY] = sampling_percentage + + return SamplingResult( + decision, + new_attributes, + _get_parent_trace_state(parent_context), + ) + + def get_description(self) -> str: + return self._description \ No newline at end of file diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py index fa8fc17fe9b5..b3c3b6b3d27c 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py @@ -3,6 +3,7 @@ from typing import no_type_check, Optional, Tuple from urllib.parse import urlparse +import math from opentelemetry.semconv.attributes import ( client_attributes, @@ -13,6 +14,11 @@ from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes from opentelemetry.util.types import Attributes +from azure.monitor.opentelemetry.exporter._constants import ( + _HASH, + _INTEGER_MAX, + _INTEGER_MIN +) # pylint:disable=too-many-return-statements def _get_default_port_db(db_system: str) -> int: @@ -320,3 +326,33 @@ def _get_url_for_http_request(attributes: Attributes) -> Optional[str]: http_target, ) return url + +def _get_djb2_sample_score(trace_id_hex: str) -> float: + # This algorithm uses 32bit integers + hash_value = _HASH + for char in trace_id_hex: + hash_value = ((hash_value << 5) + hash_value) + ord(char) + hash_value &= 0xFFFFFFFF # simulate 32-bit integer overflow + + # Convert to signed 32-bit int + if hash_value & 0x80000000: + hash_value = -((~hash_value & 0xFFFFFFFF) + 1) + + if hash_value == _INTEGER_MIN: + hash_value = int(_INTEGER_MAX) + else: + hash_value = abs(hash_value) + + return 100.0 * (float(hash_value) / _INTEGER_MAX) + +def _round_down_to_nearest(sampling_percentage: float) -> float: + if sampling_percentage == 0: + return 0 + # Handle extremely small percentages that would cause overflow + if sampling_percentage <= _INTEGER_MIN: # Extremely small threshold + return 0.0 + item_count = 100.0 / sampling_percentage + # Handle case where item_count is infinity or too large for math.ceil + if not math.isfinite(item_count) or item_count >= _INTEGER_MAX: + return 0.0 + return 100.0 / math.ceil(item_count) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py new file mode 100644 index 000000000000..42ebda609a4a --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py @@ -0,0 +1,489 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import random +import sys +import threading +import time +import unittest +from typing import List, Optional +from unittest.mock import patch, Mock + +from opentelemetry.context import Context +from opentelemetry.trace import SpanContext, TraceFlags, set_span_in_context +from opentelemetry.trace.span import NonRecordingSpan + +from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import ( + Decision, + RateLimitedSamplingPercentage, + RateLimitedSampler, + SamplingResult, +) + +from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY + + +def create_parent_span(sampled: bool, sample_rate: Optional[float] = None, is_remote: bool = False): + trace_flags = TraceFlags.SAMPLED if sampled else TraceFlags.DEFAULT + + span_context = SpanContext( + trace_id=0x1234567890abcdef1234567890abcdef, + span_id=0x1234567890abcdef, + is_remote=is_remote, + trace_flags=trace_flags, + trace_state=None, + ) + + mock_span = Mock() + mock_span.get_span_context.return_value = span_context + mock_span.is_recording.return_value = sampled + + attributes = {} + if sample_rate is not None: + attributes[_SAMPLE_RATE_KEY] = sample_rate + mock_span.attributes = attributes + + return mock_span + + +class TestRateLimitedSampler(unittest.TestCase): + + def setUp(self): + self.nano_time = [1_000_000_000_000] + self.nano_time_supplier = lambda: self.nano_time[0] + + def advance_time(self, nanos_increment: int): + self.nano_time[0] += nanos_increment + + def get_current_time_nanos(self) -> int: + return self.nano_time[0] + + def test_constant_rate_sampling(self): + target_rate = 1000.0 + sampler = RateLimitedSampler(target_rate) + + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + sampler._sampling_percentage_generator._round_to_nearest = False + + nanos_between_spans = 10_000_000 + num_spans = 10 + sampled_count = 0 + + for i in range(num_spans): + self.advance_time(nanos_between_spans) + + result = sampler.should_sample( + parent_context=None, + trace_id=i, + name=f"test-span-{i}" + ) + + self.assertIsInstance(result, SamplingResult) + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + self.assertIn(_SAMPLE_RATE_KEY, result.attributes) + + if result.decision == Decision.RECORD_AND_SAMPLE: + sampled_count += 1 + + self.assertGreater(sampled_count, 0, "Should sample some spans with high target rate") + + def test_high_volume_sampling(self): + target_rate = 5.0 + sampler = RateLimitedSampler(target_rate) + + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + nanos_between_spans = 1_000_000 + num_spans = 500 + sampled_count = 0 + + import random + random.seed(42) + trace_ids = [random.getrandbits(128) for _ in range(num_spans)] + + for i in range(num_spans): + self.advance_time(nanos_between_spans) + + result = sampler.should_sample( + parent_context=None, + trace_id=trace_ids[i], + name=f"high-volume-span-{i}" + ) + + if result.decision == Decision.RECORD_AND_SAMPLE: + sampled_count += 1 + + self.assertGreater(sampled_count, 0, "Should sample at least some spans even under high load") + self.assertLess(sampled_count, num_spans * 0.1, "Should throttle significantly under high load") + + def test_rate_adaptation_increasing_load(self): + target_rate = 20.0 + sampler = RateLimitedSampler(target_rate) + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + low_rate_interval = 20_000_000 + phase1_spans = 100 + + high_rate_interval = 2_000_000 + phase2_spans = 1000 + + sampled_phase1 = 0 + sampled_phase2 = 0 + + import random + random.seed(123) + trace_ids_phase1 = [random.getrandbits(128) for _ in range(phase1_spans)] + trace_ids_phase2 = [random.getrandbits(128) for _ in range(phase2_spans)] + + for i in range(phase1_spans): + self.advance_time(low_rate_interval) + result = sampler.should_sample(None, trace_ids_phase1[i], f"low-{i}") + if result.decision == Decision.RECORD_AND_SAMPLE: + sampled_phase1 += 1 + + for i in range(phase2_spans): + self.advance_time(high_rate_interval) + result = sampler.should_sample(None, trace_ids_phase2[i], f"high-{i}") + if result.decision == Decision.RECORD_AND_SAMPLE: + sampled_phase2 += 1 + + self.assertGreater(sampled_phase1, 0, "Should sample in low rate phase") + self.assertGreater(sampled_phase2, 0, "Should sample in high rate phase") + + phase1_percentage = (sampled_phase1 / phase1_spans) * 100 + phase2_percentage = (sampled_phase2 / phase2_spans) * 100 + + self.assertLess(phase2_percentage, phase1_percentage, + "Sampling percentage should decrease under high load") + + def test_sampler_creation(self): + for target_rate in [0.1, 0.5, 1.0, 5.0, 100.0]: + sampler = RateLimitedSampler(target_rate) + self.assertIsInstance(sampler, RateLimitedSampler) + self.assertEqual( + sampler.get_description(), + f"RateLimitedSampler{{{target_rate}}}" + ) + + def test_negative_rate_raises_error(self): + with self.assertRaises(ValueError): + RateLimitedSampler(-1.0) + + def test_zero_rate_sampling(self): + sampler = RateLimitedSampler(0.0) + + for i in range(100): + result = sampler.should_sample( + parent_context=None, + trace_id=i, + name="test-span" + ) + + self.assertIsInstance(result, SamplingResult) + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + self.assertIn(_SAMPLE_RATE_KEY, result.attributes) + + def test_sampling_decision_consistency(self): + sampler = RateLimitedSampler(50.0) + + trace_id = 12345 + + results = [] + for _ in range(10): + result = sampler.should_sample( + parent_context=None, + trace_id=trace_id, + name="test-span" + ) + results.append(result) + + first_decision = results[0].decision + for result in results[1:]: + self.assertEqual(result.decision, first_decision, + "Sampling decision should be consistent for same trace ID") + + def test_sampling_attributes(self): + sampler = RateLimitedSampler(25.0) + + result = sampler.should_sample( + parent_context=None, + trace_id=123, + name="test-span" + ) + + self.assertIsInstance(result, SamplingResult) + self.assertIn(_SAMPLE_RATE_KEY, result.attributes) + + sample_rate = result.attributes[_SAMPLE_RATE_KEY] + self.assertIsInstance(sample_rate, (int, float)) + if isinstance(sample_rate, (int, float)): + self.assertGreaterEqual(float(sample_rate), 0.0) + self.assertLessEqual(float(sample_rate), 100.0) + + def test_sampler_with_extreme_trace_ids(self): + sampler = RateLimitedSampler(1.0) + + extreme_trace_ids = [ + 0, + 1, + 2**32 - 1, + 2**64 - 1, + 0xabcdef123456789, + ] + + for trace_id in extreme_trace_ids: + with self.subTest(trace_id=trace_id): + result = sampler.should_sample( + parent_context=None, + trace_id=trace_id, + name="test-span" + ) + + self.assertIsInstance(result, SamplingResult) + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + self.assertIn(_SAMPLE_RATE_KEY, result.attributes) + + sample_rate = result.attributes[_SAMPLE_RATE_KEY] + self.assertIsInstance(sample_rate, (int, float)) + if isinstance(sample_rate, (int, float)): + self.assertGreaterEqual(float(sample_rate), 0.0) + self.assertLessEqual(float(sample_rate), 100.0) + + def test_thread_safety(self): + sampler = RateLimitedSampler(10.0) + results = [] + errors = [] + + def worker(): + try: + for i in range(50): + result = sampler.should_sample(None, i, f"thread-span-{i}") + results.append(result) + time.sleep(0.001) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=worker) for _ in range(5)] + + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + + self.assertEqual(len(errors), 0, f"Thread safety errors: {errors}") + self.assertGreater(len(results), 0) + for result in results: + self.assertIsInstance(result, SamplingResult) + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + self.assertIn(_SAMPLE_RATE_KEY, result.attributes) + + def test_parent_span_sampled_with_sample_rate(self): + sampler = RateLimitedSampler(10.0) + + parent_span = create_parent_span(sampled=True, sample_rate=75.0, is_remote=False) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertEqual(result.decision, Decision.RECORD_AND_SAMPLE) + self.assertEqual(result.attributes[_SAMPLE_RATE_KEY], 75.0) + + def test_parent_span_not_sampled_with_sample_rate(self): + sampler = RateLimitedSampler(10.0) + + parent_span = create_parent_span(sampled=False, sample_rate=25.0, is_remote=False) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertEqual(result.decision, Decision.DROP) + self.assertEqual(result.attributes[_SAMPLE_RATE_KEY], 0.0) + + def test_parent_span_sampled_with_100_percent_sample_rate(self): + sampler = RateLimitedSampler(5.0) + + parent_span = create_parent_span(sampled=True, sample_rate=100.0, is_remote=False) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertEqual(result.decision, Decision.RECORD_AND_SAMPLE) + self.assertEqual(result.attributes[_SAMPLE_RATE_KEY], 100.0) + + def test_parent_span_remote_ignored(self): + sampler = RateLimitedSampler(5.0) + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + + parent_span = create_parent_span(sampled=True, sample_rate=80.0, is_remote=True) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + self.advance_time(100_000_000) + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertNotEqual(result.attributes[_SAMPLE_RATE_KEY], 80.0) + + def test_parent_span_no_sample_rate_attribute(self): + sampler = RateLimitedSampler(5.0) + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + + parent_span = create_parent_span(sampled=True, sample_rate=None, is_remote=False) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + self.advance_time(100_000_000) + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + sample_rate = result.attributes[_SAMPLE_RATE_KEY] + self.assertIsInstance(sample_rate, (int, float)) + + def test_parent_span_invalid_context(self): + sampler = RateLimitedSampler(5.0) + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + + parent_span = Mock() + invalid_context = Mock() + invalid_context.is_valid = False + invalid_context.is_remote = False + parent_span.get_span_context.return_value = invalid_context + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + self.advance_time(100_000_000) + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + sample_rate = result.attributes[_SAMPLE_RATE_KEY] + self.assertIsInstance(sample_rate, (int, float)) + + def test_no_parent_context_uses_local_sampling(self): + sampler = RateLimitedSampler(5.0) + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + self.advance_time(100_000_000) + + result = sampler.should_sample( + parent_context=None, + trace_id=0xabc123, + name="test-span" + ) + + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + sample_rate = result.attributes[_SAMPLE_RATE_KEY] + self.assertIsInstance(sample_rate, (int, float)) + + def test_parent_context_preserves_original_attributes(self): + sampler = RateLimitedSampler(10.0) + + parent_span = create_parent_span(sampled=True, sample_rate=50.0, is_remote=False) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + original_attributes = { + "service.name": "test-service", + "operation.name": "test-operation" + } + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span", + attributes=original_attributes + ) + + self.assertEqual(result.decision, Decision.RECORD_AND_SAMPLE) + self.assertEqual(result.attributes["service.name"], "test-service") + self.assertEqual(result.attributes["operation.name"], "test-operation") + self.assertEqual(result.attributes[_SAMPLE_RATE_KEY], 50.0) + +class TestUtilityFunctions(unittest.TestCase): + + def test_djb2_hash_consistency(self): + from azure.monitor.opentelemetry.exporter.export.trace._utils import _get_djb2_sample_score + + trace_id = "test-trace-id-12345" + + scores = [_get_djb2_sample_score(trace_id) for _ in range(10)] + + self.assertTrue(all(score == scores[0] for score in scores)) + + def test_djb2_hash_edge_cases(self): + from azure.monitor.opentelemetry.exporter.export.trace._utils import _get_djb2_sample_score + + edge_cases = [ + "", + "0", + "a" * 1000, + "0123456789abcdef" * 8, + ] + + for trace_id in edge_cases: + with self.subTest(trace_id=trace_id): + score = _get_djb2_sample_score(trace_id) + self.assertIsInstance(score, float) + self.assertGreaterEqual(score, 0) + self.assertLess(score, 100) \ No newline at end of file From 4efd462f75c06bac1434c61f99872a17fb70e1f5 Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Wed, 9 Jul 2025 07:55:46 -0700 Subject: [PATCH 2/5] Added CHANGELOG entry --- sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 5eedab9ec035..0d1bd9ab94fa 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -8,6 +8,8 @@ ([#41733](https://github.com/Azure/azure-sdk-for-python/pull/41733)) - Added customer-facing statsbeat preview. ([#41669](https://github.com/Azure/azure-sdk-for-python/pull/41669)) +- Added RateLimited Sampler + ([#41925](https://github.com/Azure/azure-sdk-for-python/pull/41925)) ### Breaking Changes From c7d66085238b32e04f7fdd9ef90b66657ab6eeaf Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Wed, 9 Jul 2025 09:23:59 -0700 Subject: [PATCH 3/5] Fixed spell check errors --- .../export/trace/_rate_limited_sampling.py | 55 +++++++++---------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py index e7f05624b5f7..05e07d20d0ef 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py @@ -21,9 +21,9 @@ from azure.monitor.opentelemetry.exporter.export.trace._utils import _get_djb2_sample_score, _round_down_to_nearest class _State: - def __init__(self, effective_window_count: float, effective_window_nanos: float, last_nano_time: int): + def __init__(self, effective_window_count: float, effective_window_nanoseconds: float, last_nano_time: int): self.effective_window_count = effective_window_count - self.effective_window_nanos = effective_window_nanos + self.effective_window_nanoseconds = effective_window_nanoseconds self.last_nano_time = last_nano_time class RateLimitedSamplingPercentage: @@ -34,7 +34,7 @@ def __init__(self, target_spans_per_second_limit: float, self._nano_time_supplier = nano_time_supplier or (lambda: int(time.time_ns())) # Hardcoded adaptation time of 0.1 seconds for adjusting to sudden changes in telemetry volumes adaptation_time_seconds = 0.1 - self._inverse_adaptation_time_nanos = 1e-9 / adaptation_time_seconds + self._inverse_adaptation_time_nanoseconds = 1e-9 / adaptation_time_seconds self._target_spans_per_nanosecond_limit = 1e-9 * target_spans_per_second_limit initial_nano_time = self._nano_time_supplier() self._state = _State(0.0, 0.0, initial_nano_time) @@ -45,39 +45,38 @@ def _update_state(self, old_state: _State, current_nano_time: int) -> _State: if current_nano_time <= old_state.last_nano_time: return _State( old_state.effective_window_count + 1, - old_state.effective_window_nanos, + old_state.effective_window_nanoseconds, old_state.last_nano_time ) nano_time_delta = current_nano_time - old_state.last_nano_time - decay_factor = math.exp(-nano_time_delta * self._inverse_adaptation_time_nanos) + decay_factor = math.exp(-nano_time_delta * self._inverse_adaptation_time_nanoseconds) current_effective_window_count = old_state.effective_window_count * decay_factor + 1 - current_effective_window_nanos = old_state.effective_window_nanos * decay_factor + nano_time_delta - - return _State(current_effective_window_count, current_effective_window_nanos, current_nano_time) + current_effective_window_nanoseconds = old_state.effective_window_nanoseconds * decay_factor + nano_time_delta + + return _State(current_effective_window_count, current_effective_window_nanoseconds, current_nano_time) def get(self) -> float: - """Get the current sampling percentage (0.0 to 100.0).""" current_nano_time = self._nano_time_supplier() - + with self._lock: old_state = self._state - self._state = self._update_state(self._state, current_nano_time) + self._state = self._update_state(old_state, current_nano_time) current_state = self._state - + # Calculate sampling probability based on current state if current_state.effective_window_count == 0: return 100.0 - + sampling_probability = ( - (current_state.effective_window_nanos * self._target_spans_per_nanosecond_limit) / + (current_state.effective_window_nanoseconds * self._target_spans_per_nanosecond_limit) / current_state.effective_window_count ) - + sampling_percentage = 100 * min(sampling_probability, 1.0) - + if self._round_to_nearest: sampling_percentage = _round_down_to_nearest(sampling_percentage) - + return sampling_percentage @@ -96,11 +95,11 @@ def should_sample( links: Optional[Sequence["Link"]] = None, trace_state: Optional["TraceState"] = None, ) -> "SamplingResult": - + if parent_context is not None: parent_span = get_current_span(parent_context) parent_span_context = parent_span.get_span_context() - + # Check if parent is valid and local (not remote) if parent_span_context.is_valid and not parent_span_context.is_remote: # Check if parent was dropped/record-only first @@ -111,17 +110,17 @@ def should_sample( else: new_attributes = dict(attributes) new_attributes[_SAMPLE_RATE_KEY] = 0.0 - + return SamplingResult( Decision.DROP, new_attributes, _get_parent_trace_state(parent_context), ) - + # Parent is recording, check for sample rate attribute parent_attributes = getattr(parent_span, 'attributes', {}) parent_sample_rate = parent_attributes.get(_SAMPLE_RATE_KEY) - + if parent_sample_rate is not None: # Honor parent's sampling rate if attributes is None: @@ -129,27 +128,27 @@ def should_sample( else: new_attributes = dict(attributes) new_attributes[_SAMPLE_RATE_KEY] = parent_sample_rate - + return SamplingResult( Decision.RECORD_AND_SAMPLE, new_attributes, _get_parent_trace_state(parent_context), ) - + sampling_percentage = self._sampling_percentage_generator.get() sampling_score = _get_djb2_sample_score(format_trace_id(trace_id).lower()) - + if sampling_score < sampling_percentage: decision = Decision.RECORD_AND_SAMPLE else: decision = Decision.DROP - + if attributes is None: new_attributes = {} else: new_attributes = dict(attributes) new_attributes[_SAMPLE_RATE_KEY] = sampling_percentage - + return SamplingResult( decision, new_attributes, @@ -157,4 +156,4 @@ def should_sample( ) def get_description(self) -> str: - return self._description \ No newline at end of file + return self._description From a08ee78f460b28b0778da341260300a26ff5e6b5 Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Wed, 9 Jul 2025 10:20:50 -0700 Subject: [PATCH 4/5] Fixed spell check in tests --- .../tests/trace/test_rate_limited_sampling.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py index 42ebda609a4a..0e5f489f1337 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py @@ -52,10 +52,10 @@ def setUp(self): self.nano_time = [1_000_000_000_000] self.nano_time_supplier = lambda: self.nano_time[0] - def advance_time(self, nanos_increment: int): - self.nano_time[0] += nanos_increment + def advance_time(self, nanoseconds_increment: int): + self.nano_time[0] += nanoseconds_increment - def get_current_time_nanos(self) -> int: + def get_current_time_nanoseconds(self) -> int: return self.nano_time[0] def test_constant_rate_sampling(self): @@ -68,12 +68,12 @@ def test_constant_rate_sampling(self): sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) sampler._sampling_percentage_generator._round_to_nearest = False - nanos_between_spans = 10_000_000 + nanoseconds_between_spans = 10_000_000 num_spans = 10 sampled_count = 0 for i in range(num_spans): - self.advance_time(nanos_between_spans) + self.advance_time(nanoseconds_between_spans) result = sampler.should_sample( parent_context=None, @@ -99,7 +99,7 @@ def test_high_volume_sampling(self): initial_time = self.nano_time_supplier() sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) - nanos_between_spans = 1_000_000 + nanoseconds_between_spans = 1_000_000 num_spans = 500 sampled_count = 0 @@ -108,7 +108,7 @@ def test_high_volume_sampling(self): trace_ids = [random.getrandbits(128) for _ in range(num_spans)] for i in range(num_spans): - self.advance_time(nanos_between_spans) + self.advance_time(nanoseconds_between_spans) result = sampler.should_sample( parent_context=None, From 463011e0dc0e6f4eb892a96eaed7eedb785a253c Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Wed, 9 Jul 2025 14:05:41 -0700 Subject: [PATCH 5/5] CHANGELOG updated --- sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 0d1bd9ab94fa..5b3dfbd859c4 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -9,7 +9,7 @@ - Added customer-facing statsbeat preview. ([#41669](https://github.com/Azure/azure-sdk-for-python/pull/41669)) - Added RateLimited Sampler - ([#41925](https://github.com/Azure/azure-sdk-for-python/pull/41925)) + ([#41954](https://github.com/Azure/azure-sdk-for-python/pull/41954)) ### Breaking Changes