From 21e19b89156af4bcf8848b056bdf4a660e7bdc67 Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Mon, 7 Jul 2025 17:29:07 -0700 Subject: [PATCH 1/6] Added RateLimitedSampler --- .../opentelemetry/exporter/_constants.py | 3 + .../export/trace/_rate_limited_sampling.py | 118 +++++++ .../exporter/export/trace/_utils.py | 36 +++ .../tests/trace/test_rate_limited_sampling.py | 292 ++++++++++++++++++ .../azure/monitor/opentelemetry/_configure.py | 18 +- .../azure/monitor/opentelemetry/_constants.py | 4 +- .../opentelemetry/_utils/configurations.py | 51 ++- .../tests/utils/test_configurations.py | 175 ++++++++++- 8 files changed, 675 insertions(+), 22 deletions(-) create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py index 4a1a742d7092..5ef538d5fa29 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py @@ -218,6 +218,9 @@ # sampleRate _SAMPLE_RATE_KEY = "_MS.sampleRate" +_HASH = 5381 +_INTEGER_MAX: int = 2**31 - 1 +_INTEGER_MIN: int = -2**31 # AAD Auth diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py new file mode 100644 index 000000000000..c188eccc5cd7 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py @@ -0,0 +1,118 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import math +import threading +import time +from typing import Callable, Optional, Sequence +from opentelemetry.context import Context +from opentelemetry.trace import Link, SpanKind, format_trace_id +from opentelemetry.sdk.trace.sampling import ( + Decision, + Sampler, + SamplingResult, + _get_parent_trace_state, +) +from opentelemetry.trace.span import TraceState +from opentelemetry.util.types import Attributes + +from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY + +from azure.monitor.opentelemetry.exporter.export.trace._utils import _get_djb2_sample_score, _round_down_to_nearest + +class _State: + def __init__(self, effective_window_count: float, effective_window_nanos: float, last_nano_time: int): + self.effective_window_count = effective_window_count + self.effective_window_nanos = effective_window_nanos + self.last_nano_time = last_nano_time + +class RateLimitedSamplingPercentage: + def __init__(self, target_spans_per_second_limit: float, + nano_time_supplier: Optional[Callable[[], int]] = None, round_to_nearest: bool = True): + if target_spans_per_second_limit < 0.0: + raise ValueError("Limit for sampled spans per second must be nonnegative!") + self._nano_time_supplier = nano_time_supplier or (lambda: int(time.time_ns())) + # Hardcoded adaptation time of 0.1 seconds for adjusting to sudden changes in telemetry volumes + adaptation_time_seconds = 0.1 + self._inverse_adaptation_time_nanos = 1e-9 / adaptation_time_seconds + self._target_spans_per_nanosecond_limit = 1e-9 * target_spans_per_second_limit + initial_nano_time = self._nano_time_supplier() + self._state = _State(0.0, 0.0, initial_nano_time) + self._lock = threading.Lock() + self._round_to_nearest = round_to_nearest + + def _update_state(self, old_state: _State, current_nano_time: int) -> _State: + if current_nano_time <= old_state.last_nano_time: + return _State( + old_state.effective_window_count + 1, + old_state.effective_window_nanos, + old_state.last_nano_time + ) + nano_time_delta = current_nano_time - old_state.last_nano_time + decay_factor = math.exp(-nano_time_delta * self._inverse_adaptation_time_nanos) + current_effective_window_count = old_state.effective_window_count * decay_factor + 1 + current_effective_window_nanos = old_state.effective_window_nanos * decay_factor + nano_time_delta + + return _State(current_effective_window_count, current_effective_window_nanos, current_nano_time) + + def get(self) -> float: + """Get the current sampling percentage (0.0 to 100.0).""" + current_nano_time = self._nano_time_supplier() + + with self._lock: + old_state = self._state + self._state = self._update_state(self._state, current_nano_time) + current_state = self._state + + # Calculate sampling probability based on current state + if current_state.effective_window_count == 0: + return 100.0 + + sampling_probability = ( + (current_state.effective_window_nanos * self._target_spans_per_nanosecond_limit) / + current_state.effective_window_count + ) + + sampling_percentage = 100 * min(sampling_probability, 1.0) + + if self._round_to_nearest: + sampling_percentage = _round_down_to_nearest(sampling_percentage) + + return sampling_percentage + + +class RateLimitedSampler(Sampler): + def __init__(self, target_spans_per_second_limit: float): + self._sampling_percentage_generator = RateLimitedSamplingPercentage(target_spans_per_second_limit) + self._description = f"RateLimitedSampler{{{target_spans_per_second_limit}}}" + + def should_sample( + self, + parent_context: Optional[Context], + trace_id: int, + name: str, + kind: Optional[SpanKind] = None, + attributes: Attributes = None, + links: Optional[Sequence["Link"]] = None, + trace_state: Optional["TraceState"] = None, + ) -> "SamplingResult": + sampling_percentage = self._sampling_percentage_generator.get() + sampling_score = _get_djb2_sample_score(format_trace_id(trace_id).lower()) + + if sampling_score < sampling_percentage: + decision = Decision.RECORD_AND_SAMPLE + else: + decision = Decision.DROP + + if attributes is None: + attributes = {} + attributes[_SAMPLE_RATE_KEY] = sampling_percentage # type: ignore + + return SamplingResult( + decision, + attributes, + _get_parent_trace_state(parent_context), + ) + + def get_description(self) -> str: + return self._description \ No newline at end of file diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py index fa8fc17fe9b5..b3c3b6b3d27c 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py @@ -3,6 +3,7 @@ from typing import no_type_check, Optional, Tuple from urllib.parse import urlparse +import math from opentelemetry.semconv.attributes import ( client_attributes, @@ -13,6 +14,11 @@ from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes from opentelemetry.util.types import Attributes +from azure.monitor.opentelemetry.exporter._constants import ( + _HASH, + _INTEGER_MAX, + _INTEGER_MIN +) # pylint:disable=too-many-return-statements def _get_default_port_db(db_system: str) -> int: @@ -320,3 +326,33 @@ def _get_url_for_http_request(attributes: Attributes) -> Optional[str]: http_target, ) return url + +def _get_djb2_sample_score(trace_id_hex: str) -> float: + # This algorithm uses 32bit integers + hash_value = _HASH + for char in trace_id_hex: + hash_value = ((hash_value << 5) + hash_value) + ord(char) + hash_value &= 0xFFFFFFFF # simulate 32-bit integer overflow + + # Convert to signed 32-bit int + if hash_value & 0x80000000: + hash_value = -((~hash_value & 0xFFFFFFFF) + 1) + + if hash_value == _INTEGER_MIN: + hash_value = int(_INTEGER_MAX) + else: + hash_value = abs(hash_value) + + return 100.0 * (float(hash_value) / _INTEGER_MAX) + +def _round_down_to_nearest(sampling_percentage: float) -> float: + if sampling_percentage == 0: + return 0 + # Handle extremely small percentages that would cause overflow + if sampling_percentage <= _INTEGER_MIN: # Extremely small threshold + return 0.0 + item_count = 100.0 / sampling_percentage + # Handle case where item_count is infinity or too large for math.ceil + if not math.isfinite(item_count) or item_count >= _INTEGER_MAX: + return 0.0 + return 100.0 / math.ceil(item_count) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py new file mode 100644 index 000000000000..5a1a7fb12bc0 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py @@ -0,0 +1,292 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import random +import sys +import threading +import time +import unittest +from typing import List +from unittest.mock import patch + +from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import ( + Decision, + RateLimitedSamplingPercentage, + RateLimitedSampler, + SamplingResult, +) + +from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY + + +class TestRateLimitedSampler(unittest.TestCase): + + def setUp(self): + self.nano_time = [1_000_000_000_000] + self.nano_time_supplier = lambda: self.nano_time[0] + + def advance_time(self, nanos_increment: int): + self.nano_time[0] += nanos_increment + + def get_current_time_nanos(self) -> int: + return self.nano_time[0] + + def test_constant_rate_sampling(self): + target_rate = 1000.0 + sampler = RateLimitedSampler(target_rate) + + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + sampler._sampling_percentage_generator._round_to_nearest = False + + nanos_between_spans = 10_000_000 + num_spans = 10 + sampled_count = 0 + + for i in range(num_spans): + self.advance_time(nanos_between_spans) + + result = sampler.should_sample( + parent_context=None, + trace_id=i, + name=f"test-span-{i}" + ) + + self.assertIsInstance(result, SamplingResult) + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + self.assertIn(_SAMPLE_RATE_KEY, result.attributes) + + if result.decision == Decision.RECORD_AND_SAMPLE: + sampled_count += 1 + + self.assertGreater(sampled_count, 0, "Should sample some spans with high target rate") + + def test_high_volume_sampling(self): + target_rate = 5.0 + sampler = RateLimitedSampler(target_rate) + + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + nanos_between_spans = 1_000_000 + num_spans = 500 + sampled_count = 0 + + import random + random.seed(42) + trace_ids = [random.getrandbits(128) for _ in range(num_spans)] + + for i in range(num_spans): + self.advance_time(nanos_between_spans) + + result = sampler.should_sample( + parent_context=None, + trace_id=trace_ids[i], + name=f"high-volume-span-{i}" + ) + + if result.decision == Decision.RECORD_AND_SAMPLE: + sampled_count += 1 + + self.assertGreater(sampled_count, 0, "Should sample at least some spans even under high load") + self.assertLess(sampled_count, num_spans * 0.1, "Should throttle significantly under high load") + + def test_rate_adaptation_increasing_load(self): + target_rate = 20.0 + sampler = RateLimitedSampler(target_rate) + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + low_rate_interval = 20_000_000 + phase1_spans = 100 + + high_rate_interval = 2_000_000 + phase2_spans = 1000 + + sampled_phase1 = 0 + sampled_phase2 = 0 + + import random + random.seed(123) + trace_ids_phase1 = [random.getrandbits(128) for _ in range(phase1_spans)] + trace_ids_phase2 = [random.getrandbits(128) for _ in range(phase2_spans)] + + for i in range(phase1_spans): + self.advance_time(low_rate_interval) + result = sampler.should_sample(None, trace_ids_phase1[i], f"low-{i}") + if result.decision == Decision.RECORD_AND_SAMPLE: + sampled_phase1 += 1 + + for i in range(phase2_spans): + self.advance_time(high_rate_interval) + result = sampler.should_sample(None, trace_ids_phase2[i], f"high-{i}") + if result.decision == Decision.RECORD_AND_SAMPLE: + sampled_phase2 += 1 + + self.assertGreater(sampled_phase1, 0, "Should sample in low rate phase") + self.assertGreater(sampled_phase2, 0, "Should sample in high rate phase") + + phase1_percentage = (sampled_phase1 / phase1_spans) * 100 + phase2_percentage = (sampled_phase2 / phase2_spans) * 100 + + self.assertLess(phase2_percentage, phase1_percentage, + "Sampling percentage should decrease under high load") + + def test_sampler_creation(self): + for target_rate in [0.1, 0.5, 1.0, 5.0, 100.0]: + sampler = RateLimitedSampler(target_rate) + self.assertIsInstance(sampler, RateLimitedSampler) + self.assertEqual( + sampler.get_description(), + f"RateLimitedSampler{{{target_rate}}}" + ) + + def test_negative_rate_raises_error(self): + with self.assertRaises(ValueError): + RateLimitedSampler(-1.0) + + def test_zero_rate_sampling(self): + sampler = RateLimitedSampler(0.0) + + for i in range(100): + result = sampler.should_sample( + parent_context=None, + trace_id=i, + name="test-span" + ) + + self.assertIsInstance(result, SamplingResult) + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + self.assertIn(_SAMPLE_RATE_KEY, result.attributes) + + def test_sampling_decision_consistency(self): + sampler = RateLimitedSampler(50.0) + + trace_id = 12345 + + results = [] + for _ in range(10): + result = sampler.should_sample( + parent_context=None, + trace_id=trace_id, + name="test-span" + ) + results.append(result) + + first_decision = results[0].decision + for result in results[1:]: + self.assertEqual(result.decision, first_decision, + "Sampling decision should be consistent for same trace ID") + + def test_sampling_attributes(self): + sampler = RateLimitedSampler(25.0) + + result = sampler.should_sample( + parent_context=None, + trace_id=123, + name="test-span" + ) + + self.assertIsInstance(result, SamplingResult) + self.assertIn(_SAMPLE_RATE_KEY, result.attributes) + + sample_rate = result.attributes[_SAMPLE_RATE_KEY] + self.assertIsInstance(sample_rate, (int, float)) + if isinstance(sample_rate, (int, float)): + self.assertGreaterEqual(float(sample_rate), 0.0) + self.assertLessEqual(float(sample_rate), 100.0) + + def test_sampler_with_extreme_trace_ids(self): + sampler = RateLimitedSampler(1.0) + + extreme_trace_ids = [ + 0, + 1, + 2**32 - 1, + 2**64 - 1, + 0xabcdef123456789, + ] + + for trace_id in extreme_trace_ids: + with self.subTest(trace_id=trace_id): + result = sampler.should_sample( + parent_context=None, + trace_id=trace_id, + name="test-span" + ) + + self.assertIsInstance(result, SamplingResult) + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + self.assertIn(_SAMPLE_RATE_KEY, result.attributes) + + sample_rate = result.attributes[_SAMPLE_RATE_KEY] + self.assertIsInstance(sample_rate, (int, float)) + if isinstance(sample_rate, (int, float)): + self.assertGreaterEqual(float(sample_rate), 0.0) + self.assertLessEqual(float(sample_rate), 100.0) + + def test_thread_safety(self): + sampler = RateLimitedSampler(10.0) + results = [] + errors = [] + + def worker(): + try: + for i in range(50): + result = sampler.should_sample(None, i, f"thread-span-{i}") + results.append(result) + time.sleep(0.001) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=worker) for _ in range(5)] + + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + + self.assertEqual(len(errors), 0, f"Thread safety errors: {errors}") + self.assertGreater(len(results), 0) + for result in results: + self.assertIsInstance(result, SamplingResult) + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + self.assertIn(_SAMPLE_RATE_KEY, result.attributes) + + +class TestUtilityFunctions(unittest.TestCase): + + def test_djb2_hash_consistency(self): + from azure.monitor.opentelemetry.exporter.export.trace._utils import _get_djb2_sample_score + + trace_id = "test-trace-id-12345" + + scores = [_get_djb2_sample_score(trace_id) for _ in range(10)] + + self.assertTrue(all(score == scores[0] for score in scores)) + + def test_djb2_hash_edge_cases(self): + from azure.monitor.opentelemetry.exporter.export.trace._utils import _get_djb2_sample_score + + edge_cases = [ + "", + "0", + "a" * 1000, + "0123456789abcdef" * 8, + ] + + for trace_id in edge_cases: + with self.subTest(trace_id=trace_id): + score = _get_djb2_sample_score(trace_id) + self.assertIsInstance(score, float) + self.assertGreaterEqual(score, 0) + self.assertLess(score, 100) + + + diff --git a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py index 07412ad8cdc1..cfefe1d3ba7b 100644 --- a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py +++ b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py @@ -35,6 +35,7 @@ LOGGING_FORMATTER_ARG, RESOURCE_ARG, SAMPLING_RATIO_ARG, + SAMPLING_TRACES_PER_SECOND_ARG, SPAN_PROCESSORS_ARG, VIEWS_ARG, ) @@ -50,6 +51,7 @@ ApplicationInsightsSampler, AzureMonitorMetricExporter, AzureMonitorTraceExporter, + RateLimitedSampler, ) from azure.monitor.opentelemetry.exporter._utils import ( # pylint: disable=import-error,no-name-in-module _is_attach_enabled, @@ -133,10 +135,18 @@ def configure_azure_monitor(**kwargs) -> None: # pylint: disable=C4758 def _setup_tracing(configurations: Dict[str, ConfigurationValue]): resource: Resource = configurations[RESOURCE_ARG] # type: ignore - sampling_ratio = configurations[SAMPLING_RATIO_ARG] - tracer_provider = TracerProvider( - sampler=ApplicationInsightsSampler(sampling_ratio=cast(float, sampling_ratio)), resource=resource - ) + if SAMPLING_TRACES_PER_SECOND_ARG in configurations: + sampling_traces_per_second = configurations[SAMPLING_TRACES_PER_SECOND_ARG] + tracer_provider = TracerProvider( + sampler=RateLimitedSampler(sampling_ratio=cast(float, sampling_traces_per_second), resource=resource) + ) + else: + sampling_ratio = configurations[SAMPLING_RATIO_ARG] + tracer_provider = TracerProvider( + sampler=ApplicationInsightsSampler(sampling_ratio=cast(float, sampling_ratio)), resource=resource + ) + + for span_processor in configurations[SPAN_PROCESSORS_ARG]: # type: ignore tracer_provider.add_span_processor(span_processor) # type: ignore if configurations.get(ENABLE_LIVE_METRICS_ARG): diff --git a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_constants.py b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_constants.py index af0d9ec0fedb..5d6c3a0ea71b 100644 --- a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_constants.py @@ -24,7 +24,9 @@ SAMPLING_RATIO_ARG = "sampling_ratio" SPAN_PROCESSORS_ARG = "span_processors" VIEWS_ARG = "views" - +RATE_LIMITED_SAMPLER = "microsoft.rate_limited" +FIXED_PERCENTAGE_SAMPLER = "microsoft.fixed.percentage" +SAMPLING_TRACES_PER_SECOND_ARG = "sampling_traces_per_second" # --------------------Autoinstrumentation Configuration------------------------------------------ diff --git a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_utils/configurations.py b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_utils/configurations.py index c84a6bdb2cff..714534b1b218 100644 --- a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_utils/configurations.py +++ b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_utils/configurations.py @@ -19,6 +19,7 @@ from opentelemetry.sdk.environment_variables import ( OTEL_EXPERIMENTAL_RESOURCE_DETECTORS, OTEL_TRACES_SAMPLER_ARG, + OTEL_TRACES_SAMPLER ) from opentelemetry.sdk.resources import Resource @@ -37,25 +38,25 @@ LOGGING_FORMATTER_ARG, RESOURCE_ARG, SAMPLING_RATIO_ARG, + SAMPLING_TRACES_PER_SECOND_ARG, SPAN_PROCESSORS_ARG, VIEWS_ARG, + RATE_LIMITED_SAMPLER, + FIXED_PERCENTAGE_SAMPLER, ) from azure.monitor.opentelemetry._types import ConfigurationValue from azure.monitor.opentelemetry._version import VERSION _INVALID_FLOAT_MESSAGE = "Value of %s must be a float. Defaulting to %s: %s" +_INVALID_TRACES_PER_SECOND_MESSAGE = "Value of %s must be a positive number for traces per second. Defaulting to %s: %s" _SUPPORTED_RESOURCE_DETECTORS = ( _AZURE_APP_SERVICE_RESOURCE_DETECTOR_NAME, _AZURE_VM_RESOURCE_DETECTOR_NAME, ) -# TODO: remove when sampler uses env var instead -SAMPLING_RATIO_ENV_VAR = OTEL_TRACES_SAMPLER_ARG - _logger = getLogger(__name__) - def _get_configurations(**kwargs) -> Dict[str, ConfigurationValue]: configurations = {} @@ -120,21 +121,51 @@ def _default_resource(configurations): configurations[RESOURCE_ARG] = Resource.create(configurations[RESOURCE_ARG].attributes) -# TODO: remove when sampler uses env var instead def _default_sampling_ratio(configurations): default = 1.0 - if SAMPLING_RATIO_ENV_VAR in environ: + + if environ.get(OTEL_TRACES_SAMPLER_ARG) is not None: try: - default = float(environ[SAMPLING_RATIO_ENV_VAR]) + if float(environ[OTEL_TRACES_SAMPLER_ARG]) < 0: + _logger.error("Invalid value for OTEL_TRACES_SAMPLER_ARG. It should be a non-negative number.") + except: + pass + else: + _logger.error("OTEL_TRACES_SAMPLER_ARG is not set.") + + # Check if rate-limited sampler is configured + if environ.get(OTEL_TRACES_SAMPLER) == RATE_LIMITED_SAMPLER: + try: + default = float(environ[OTEL_TRACES_SAMPLER_ARG]) + print(f"Using rate limited sampler: {default} traces per second") + except ValueError as e: + _logger.error( # pylint: disable=C + _INVALID_TRACES_PER_SECOND_MESSAGE, + OTEL_TRACES_SAMPLER_ARG, + default, + e, + ) + configurations[SAMPLING_TRACES_PER_SECOND_ARG] = default + elif environ.get(OTEL_TRACES_SAMPLER) == FIXED_PERCENTAGE_SAMPLER: + try: + default = float(environ[OTEL_TRACES_SAMPLER_ARG]) + print(f"Using sampling ratio: {default}") except ValueError as e: _logger.error( # pylint: disable=C _INVALID_FLOAT_MESSAGE, - SAMPLING_RATIO_ENV_VAR, + OTEL_TRACES_SAMPLER_ARG, default, e, ) - configurations[SAMPLING_RATIO_ARG] = default - + configurations[SAMPLING_RATIO_ARG] = default + else: + # Default behavior - always set sampling_ratio + configurations[SAMPLING_RATIO_ARG] = default + _logger.error( # pylint: disable=C + f"Invalid argument for the sampler to be used for tracing. Supported values are {RATE_LIMITED_SAMPLER} and {FIXED_PERCENTAGE_SAMPLER}. Defaulting to %s: %s", + OTEL_TRACES_SAMPLER, + OTEL_TRACES_SAMPLER_ARG, + ) def _default_instrumentation_options(configurations): otel_disabled_instrumentations = _get_otel_disabled_instrumentations() diff --git a/sdk/monitor/azure-monitor-opentelemetry/tests/utils/test_configurations.py b/sdk/monitor/azure-monitor-opentelemetry/tests/utils/test_configurations.py index 7663e5bec6c9..e0a694828f52 100644 --- a/sdk/monitor/azure-monitor-opentelemetry/tests/utils/test_configurations.py +++ b/sdk/monitor/azure-monitor-opentelemetry/tests/utils/test_configurations.py @@ -16,13 +16,20 @@ from unittest import TestCase from unittest.mock import patch -from opentelemetry.instrumentation.environment_variables import ( - OTEL_PYTHON_DISABLED_INSTRUMENTATIONS, +from opentelemetry.sdk.environment_variables import ( + OTEL_EXPERIMENTAL_RESOURCE_DETECTORS, + OTEL_TRACES_SAMPLER_ARG, + OTEL_TRACES_SAMPLER ) +from opentelemetry.instrumentation.environment_variables import ( + OTEL_PYTHON_DISABLED_INSTRUMENTATIONS,) from azure.monitor.opentelemetry._utils.configurations import ( - SAMPLING_RATIO_ENV_VAR, _get_configurations, ) +from azure.monitor.opentelemetry._constants import ( + RATE_LIMITED_SAMPLER, + FIXED_PERCENTAGE_SAMPLER, +) from opentelemetry.environment_variables import ( OTEL_LOGS_EXPORTER, OTEL_METRICS_EXPORTER, @@ -134,7 +141,7 @@ def test_get_configurations_defaults(self, resource_create_mock): "os.environ", { OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: "flask,requests,fastapi,azure_sdk", - SAMPLING_RATIO_ENV_VAR: "0.5", + OTEL_TRACES_SAMPLER_ARG: "0.5", OTEL_TRACES_EXPORTER: "None", OTEL_LOGS_EXPORTER: "none", OTEL_METRICS_EXPORTER: "NONE", @@ -166,12 +173,13 @@ def test_get_configurations_env_vars(self, resource_create_mock): self.assertEqual(configurations["resource"].attributes, TEST_DEFAULT_RESOURCE.attributes) self.assertEqual(environ[OTEL_EXPERIMENTAL_RESOURCE_DETECTORS], "custom_resource_detector") resource_create_mock.assert_called_once_with() - self.assertEqual(configurations["sampling_ratio"], 0.5) + self.assertEqual(configurations["sampling_ratio"], 1.0) @patch.dict( "os.environ", { - SAMPLING_RATIO_ENV_VAR: "Half", + OTEL_TRACES_SAMPLER: FIXED_PERCENTAGE_SAMPLER, + OTEL_TRACES_SAMPLER_ARG: "Half", OTEL_TRACES_EXPORTER: "False", OTEL_LOGS_EXPORTER: "no", OTEL_METRICS_EXPORTER: "True", @@ -181,7 +189,7 @@ def test_get_configurations_env_vars(self, resource_create_mock): @patch("opentelemetry.sdk.resources.Resource.create", return_value=TEST_DEFAULT_RESOURCE) def test_get_configurations_env_vars_validation(self, resource_create_mock): configurations = _get_configurations() - + print(configurations) self.assertTrue("connection_string" not in configurations) self.assertEqual(configurations["disable_logging"], False) self.assertEqual(configurations["disable_metrics"], False) @@ -260,3 +268,156 @@ def test_merge_instrumentation_options_extra_args(self, resource_create_mock): "urllib3": {"enabled": True}, }, ) + @patch.dict( + "os.environ", + { + OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: "flask,requests,fastapi,azure_sdk", + OTEL_TRACES_SAMPLER: RATE_LIMITED_SAMPLER, + OTEL_TRACES_SAMPLER_ARG: "0.5", + OTEL_TRACES_EXPORTER: "None", + OTEL_LOGS_EXPORTER: "none", + OTEL_METRICS_EXPORTER: "NONE", + OTEL_EXPERIMENTAL_RESOURCE_DETECTORS: "custom_resource_detector", + }, + clear=True, + ) + @patch("opentelemetry.sdk.resources.Resource.create", return_value=TEST_DEFAULT_RESOURCE) + def test_get_configurations_env_vars_rate_limited(self, resource_create_mock): + configurations = _get_configurations() + + self.assertTrue("connection_string" not in configurations) + self.assertEqual(configurations["disable_logging"], True) + self.assertEqual(configurations["disable_metrics"], True) + self.assertEqual(configurations["disable_tracing"], True) + self.assertEqual( + configurations["instrumentation_options"], + { + "azure_sdk": {"enabled": False}, + "django": {"enabled": True}, + "fastapi": {"enabled": False}, + "flask": {"enabled": False}, + "psycopg2": {"enabled": True}, + "requests": {"enabled": False}, + "urllib": {"enabled": True}, + "urllib3": {"enabled": True}, + }, + ) + self.assertEqual(configurations["resource"].attributes, TEST_DEFAULT_RESOURCE.attributes) + self.assertEqual(environ[OTEL_EXPERIMENTAL_RESOURCE_DETECTORS], "custom_resource_detector") + resource_create_mock.assert_called_once_with() + self.assertEqual(configurations["sampling_traces_per_second"], 0.5) + + @patch.dict( + "os.environ", + { + OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: "flask,requests,fastapi,azure_sdk", + OTEL_TRACES_SAMPLER_ARG: "34", + OTEL_TRACES_EXPORTER: "None", + OTEL_LOGS_EXPORTER: "none", + OTEL_METRICS_EXPORTER: "NONE", + OTEL_EXPERIMENTAL_RESOURCE_DETECTORS: "custom_resource_detector", + }, + clear=True, + ) + @patch("opentelemetry.sdk.resources.Resource.create", return_value=TEST_DEFAULT_RESOURCE) + def test_get_configurations_env_vars_no_preference(self, resource_create_mock): + configurations = _get_configurations() + + self.assertTrue("connection_string" not in configurations) + self.assertEqual(configurations["disable_logging"], True) + self.assertEqual(configurations["disable_metrics"], True) + self.assertEqual(configurations["disable_tracing"], True) + self.assertEqual( + configurations["instrumentation_options"], + { + "azure_sdk": {"enabled": False}, + "django": {"enabled": True}, + "fastapi": {"enabled": False}, + "flask": {"enabled": False}, + "psycopg2": {"enabled": True}, + "requests": {"enabled": False}, + "urllib": {"enabled": True}, + "urllib3": {"enabled": True}, + }, + ) + self.assertEqual(configurations["resource"].attributes, TEST_DEFAULT_RESOURCE.attributes) + self.assertEqual(environ[OTEL_EXPERIMENTAL_RESOURCE_DETECTORS], "custom_resource_detector") + resource_create_mock.assert_called_once_with() + self.assertEqual(configurations["sampling_ratio"], 1.0) + + @patch.dict( + "os.environ", + { + OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: "flask,requests,fastapi,azure_sdk", + OTEL_TRACES_SAMPLER_ARG: "2 traces per second", + OTEL_TRACES_EXPORTER: "None", + OTEL_LOGS_EXPORTER: "none", + OTEL_METRICS_EXPORTER: "NONE", + OTEL_EXPERIMENTAL_RESOURCE_DETECTORS: "custom_resource_detector", + }, + clear=True, + ) + @patch("opentelemetry.sdk.resources.Resource.create", return_value=TEST_DEFAULT_RESOURCE) + def test_get_configurations_env_vars_check_default(self, resource_create_mock): + configurations = _get_configurations() + + self.assertTrue("connection_string" not in configurations) + self.assertEqual(configurations["disable_logging"], True) + self.assertEqual(configurations["disable_metrics"], True) + self.assertEqual(configurations["disable_tracing"], True) + self.assertEqual( + configurations["instrumentation_options"], + { + "azure_sdk": {"enabled": False}, + "django": {"enabled": True}, + "fastapi": {"enabled": False}, + "flask": {"enabled": False}, + "psycopg2": {"enabled": True}, + "requests": {"enabled": False}, + "urllib": {"enabled": True}, + "urllib3": {"enabled": True}, + }, + ) + self.assertEqual(configurations["resource"].attributes, TEST_DEFAULT_RESOURCE.attributes) + self.assertEqual(environ[OTEL_EXPERIMENTAL_RESOURCE_DETECTORS], "custom_resource_detector") + resource_create_mock.assert_called_once_with() + self.assertEqual(configurations["sampling_ratio"], 1.0) + + @patch.dict( + "os.environ", + { + OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: "flask,requests,fastapi,azure_sdk", + OTEL_TRACES_SAMPLER: FIXED_PERCENTAGE_SAMPLER, + OTEL_TRACES_SAMPLER_ARG: "0.9", + OTEL_TRACES_EXPORTER: "None", + OTEL_LOGS_EXPORTER: "none", + OTEL_METRICS_EXPORTER: "NONE", + OTEL_EXPERIMENTAL_RESOURCE_DETECTORS: "custom_resource_detector", + }, + clear=True, + ) + @patch("opentelemetry.sdk.resources.Resource.create", return_value=TEST_DEFAULT_RESOURCE) + def test_get_configurations_env_vars_fixed_percentage(self, resource_create_mock): + configurations = _get_configurations() + + self.assertTrue("connection_string" not in configurations) + self.assertEqual(configurations["disable_logging"], True) + self.assertEqual(configurations["disable_metrics"], True) + self.assertEqual(configurations["disable_tracing"], True) + self.assertEqual( + configurations["instrumentation_options"], + { + "azure_sdk": {"enabled": False}, + "django": {"enabled": True}, + "fastapi": {"enabled": False}, + "flask": {"enabled": False}, + "psycopg2": {"enabled": True}, + "requests": {"enabled": False}, + "urllib": {"enabled": True}, + "urllib3": {"enabled": True}, + }, + ) + self.assertEqual(configurations["resource"].attributes, TEST_DEFAULT_RESOURCE.attributes) + self.assertEqual(environ[OTEL_EXPERIMENTAL_RESOURCE_DETECTORS], "custom_resource_detector") + resource_create_mock.assert_called_once_with() + self.assertEqual(configurations["sampling_ratio"], 0.9) \ No newline at end of file From 704d5faf93a78b32beabde4ae1e18c4687ef2a6f Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Tue, 8 Jul 2025 07:24:43 -0700 Subject: [PATCH 2/6] Added CHANGELOG --- sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md | 2 ++ sdk/monitor/azure-monitor-opentelemetry/CHANGELOG.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 95da43368e03..f5eee977632d 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -6,6 +6,8 @@ - Detect synthetically created telemetry based on the user-agent header ([#41733](https://github.com/Azure/azure-sdk-for-python/pull/41733)) +- Added RateLimited Sampler + ([#41925](https://github.com/Azure/azure-sdk-for-python/pull/41925)) ### Breaking Changes diff --git a/sdk/monitor/azure-monitor-opentelemetry/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry/CHANGELOG.md index 2f8986a85ba1..6fe0ca210d19 100644 --- a/sdk/monitor/azure-monitor-opentelemetry/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry/CHANGELOG.md @@ -3,6 +3,8 @@ ## 1.6.11 (Unreleased) ### Features Added +- Added RateLimited Sampler + ([#41925](https://github.com/Azure/azure-sdk-for-python/pull/41925)) ### Breaking Changes From 7183758f3f21724975ffcdece6ebc69e1542082b Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Tue, 8 Jul 2025 08:07:26 -0700 Subject: [PATCH 3/6] Fixed linting errors --- .../azure/monitor/opentelemetry/_configure.py | 1 - .../monitor/opentelemetry/_utils/configurations.py | 11 +++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py index cfefe1d3ba7b..7f00d30be3bb 100644 --- a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py +++ b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py @@ -145,7 +145,6 @@ def _setup_tracing(configurations: Dict[str, ConfigurationValue]): tracer_provider = TracerProvider( sampler=ApplicationInsightsSampler(sampling_ratio=cast(float, sampling_ratio)), resource=resource ) - for span_processor in configurations[SPAN_PROCESSORS_ARG]: # type: ignore tracer_provider.add_span_processor(span_processor) # type: ignore diff --git a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_utils/configurations.py b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_utils/configurations.py index 714534b1b218..b1f66bc6b7be 100644 --- a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_utils/configurations.py +++ b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_utils/configurations.py @@ -123,12 +123,12 @@ def _default_resource(configurations): def _default_sampling_ratio(configurations): default = 1.0 - + if environ.get(OTEL_TRACES_SAMPLER_ARG) is not None: try: if float(environ[OTEL_TRACES_SAMPLER_ARG]) < 0: _logger.error("Invalid value for OTEL_TRACES_SAMPLER_ARG. It should be a non-negative number.") - except: + except ValueError: pass else: _logger.error("OTEL_TRACES_SAMPLER_ARG is not set.") @@ -161,8 +161,11 @@ def _default_sampling_ratio(configurations): else: # Default behavior - always set sampling_ratio configurations[SAMPLING_RATIO_ARG] = default - _logger.error( # pylint: disable=C - f"Invalid argument for the sampler to be used for tracing. Supported values are {RATE_LIMITED_SAMPLER} and {FIXED_PERCENTAGE_SAMPLER}. Defaulting to %s: %s", + _logger.error( + "Invalid argument for the sampler to be used for tracing. " + "Supported values are %s and %s. Defaulting to %s: %s", + RATE_LIMITED_SAMPLER, + FIXED_PERCENTAGE_SAMPLER, OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG, ) From 36596b1ce57355dfe4873c15d3ac15890cef3b52 Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Tue, 8 Jul 2025 10:51:58 -0700 Subject: [PATCH 4/6] Added logic for sampling decision for parent span --- .../export/trace/_rate_limited_sampling.py | 50 ++++- .../tests/trace/test_rate_limited_sampling.py | 204 +++++++++++++++++- .../azure/monitor/opentelemetry/_configure.py | 4 +- 3 files changed, 250 insertions(+), 8 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py index c188eccc5cd7..e7f05624b5f7 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py @@ -6,7 +6,7 @@ import time from typing import Callable, Optional, Sequence from opentelemetry.context import Context -from opentelemetry.trace import Link, SpanKind, format_trace_id +from opentelemetry.trace import Link, SpanKind, format_trace_id, get_current_span from opentelemetry.sdk.trace.sampling import ( Decision, Sampler, @@ -96,6 +96,46 @@ def should_sample( links: Optional[Sequence["Link"]] = None, trace_state: Optional["TraceState"] = None, ) -> "SamplingResult": + + if parent_context is not None: + parent_span = get_current_span(parent_context) + parent_span_context = parent_span.get_span_context() + + # Check if parent is valid and local (not remote) + if parent_span_context.is_valid and not parent_span_context.is_remote: + # Check if parent was dropped/record-only first + if not parent_span.is_recording(): + # Parent was dropped, drop this child too + if attributes is None: + new_attributes = {} + else: + new_attributes = dict(attributes) + new_attributes[_SAMPLE_RATE_KEY] = 0.0 + + return SamplingResult( + Decision.DROP, + new_attributes, + _get_parent_trace_state(parent_context), + ) + + # Parent is recording, check for sample rate attribute + parent_attributes = getattr(parent_span, 'attributes', {}) + parent_sample_rate = parent_attributes.get(_SAMPLE_RATE_KEY) + + if parent_sample_rate is not None: + # Honor parent's sampling rate + if attributes is None: + new_attributes = {} + else: + new_attributes = dict(attributes) + new_attributes[_SAMPLE_RATE_KEY] = parent_sample_rate + + return SamplingResult( + Decision.RECORD_AND_SAMPLE, + new_attributes, + _get_parent_trace_state(parent_context), + ) + sampling_percentage = self._sampling_percentage_generator.get() sampling_score = _get_djb2_sample_score(format_trace_id(trace_id).lower()) @@ -105,12 +145,14 @@ def should_sample( decision = Decision.DROP if attributes is None: - attributes = {} - attributes[_SAMPLE_RATE_KEY] = sampling_percentage # type: ignore + new_attributes = {} + else: + new_attributes = dict(attributes) + new_attributes[_SAMPLE_RATE_KEY] = sampling_percentage return SamplingResult( decision, - attributes, + new_attributes, _get_parent_trace_state(parent_context), ) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py index 5a1a7fb12bc0..4b05d43e1cb7 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py @@ -1,12 +1,17 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. + import random import sys import threading import time import unittest -from typing import List -from unittest.mock import patch +from typing import List, Optional +from unittest.mock import patch, Mock + +from opentelemetry.context import Context +from opentelemetry.trace import SpanContext, TraceFlags, set_span_in_context +from opentelemetry.trace.span import NonRecordingSpan from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import ( Decision, @@ -18,6 +23,29 @@ from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY +def create_parent_span(sampled: bool, sample_rate: Optional[float] = None, is_remote: bool = False): + trace_flags = TraceFlags.SAMPLED if sampled else TraceFlags.DEFAULT + + span_context = SpanContext( + trace_id=0x1234567890abcdef1234567890abcdef, + span_id=0x1234567890abcdef, + is_remote=is_remote, + trace_flags=trace_flags, + trace_state=None, + ) + + mock_span = Mock() + mock_span.get_span_context.return_value = span_context + mock_span.is_recording.return_value = sampled + + attributes = {} + if sample_rate is not None: + attributes[_SAMPLE_RATE_KEY] = sample_rate + mock_span.attributes = attributes + + return mock_span + + class TestRateLimitedSampler(unittest.TestCase): def setUp(self): @@ -259,6 +287,178 @@ def worker(): self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) self.assertIn(_SAMPLE_RATE_KEY, result.attributes) + def test_parent_span_sampled_with_sample_rate(self): + sampler = RateLimitedSampler(10.0) + + parent_span = create_parent_span(sampled=True, sample_rate=75.0, is_remote=False) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertEqual(result.decision, Decision.RECORD_AND_SAMPLE) + self.assertEqual(result.attributes[_SAMPLE_RATE_KEY], 75.0) + + def test_parent_span_not_sampled_with_sample_rate(self): + sampler = RateLimitedSampler(10.0) + + parent_span = create_parent_span(sampled=False, sample_rate=25.0, is_remote=False) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertEqual(result.decision, Decision.DROP) + self.assertEqual(result.attributes[_SAMPLE_RATE_KEY], 0.0) + + def test_parent_span_sampled_with_100_percent_sample_rate(self): + sampler = RateLimitedSampler(5.0) + + parent_span = create_parent_span(sampled=True, sample_rate=100.0, is_remote=False) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertEqual(result.decision, Decision.RECORD_AND_SAMPLE) + self.assertEqual(result.attributes[_SAMPLE_RATE_KEY], 100.0) + + def test_parent_span_remote_ignored(self): + sampler = RateLimitedSampler(5.0) + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + + parent_span = create_parent_span(sampled=True, sample_rate=80.0, is_remote=True) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + self.advance_time(100_000_000) + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertNotEqual(result.attributes[_SAMPLE_RATE_KEY], 80.0) + + def test_parent_span_no_sample_rate_attribute(self): + sampler = RateLimitedSampler(5.0) + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + + parent_span = create_parent_span(sampled=True, sample_rate=None, is_remote=False) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + self.advance_time(100_000_000) + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + sample_rate = result.attributes[_SAMPLE_RATE_KEY] + self.assertIsInstance(sample_rate, (int, float)) + + def test_parent_span_invalid_context(self): + sampler = RateLimitedSampler(5.0) + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + + parent_span = Mock() + invalid_context = Mock() + invalid_context.is_valid = False + invalid_context.is_remote = False + parent_span.get_span_context.return_value = invalid_context + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + self.advance_time(100_000_000) + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span" + ) + + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + sample_rate = result.attributes[_SAMPLE_RATE_KEY] + self.assertIsInstance(sample_rate, (int, float)) + + def test_no_parent_context_uses_local_sampling(self): + sampler = RateLimitedSampler(5.0) + sampler._sampling_percentage_generator._nano_time_supplier = self.nano_time_supplier + + from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import _State + initial_time = self.nano_time_supplier() + sampler._sampling_percentage_generator._state = _State(0.0, 0.0, initial_time) + + self.advance_time(100_000_000) + + result = sampler.should_sample( + parent_context=None, + trace_id=0xabc123, + name="test-span" + ) + + self.assertIn(result.decision, [Decision.RECORD_AND_SAMPLE, Decision.DROP]) + sample_rate = result.attributes[_SAMPLE_RATE_KEY] + self.assertIsInstance(sample_rate, (int, float)) + + def test_parent_context_preserves_original_attributes(self): + sampler = RateLimitedSampler(10.0) + + parent_span = create_parent_span(sampled=True, sample_rate=50.0, is_remote=False) + + with patch('azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling.get_current_span', return_value=parent_span): + context = Mock() + + original_attributes = { + "service.name": "test-service", + "operation.name": "test-operation" + } + + result = sampler.should_sample( + parent_context=context, + trace_id=0xabc123, + name="test-span", + attributes=original_attributes + ) + + self.assertEqual(result.decision, Decision.RECORD_AND_SAMPLE) + self.assertEqual(result.attributes["service.name"], "test-service") + self.assertEqual(result.attributes["operation.name"], "test-operation") + self.assertEqual(result.attributes[_SAMPLE_RATE_KEY], 50.0) class TestUtilityFunctions(unittest.TestCase): diff --git a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py index 7f00d30be3bb..0331da7b7d95 100644 --- a/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py +++ b/sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py @@ -136,9 +136,9 @@ def configure_azure_monitor(**kwargs) -> None: # pylint: disable=C4758 def _setup_tracing(configurations: Dict[str, ConfigurationValue]): resource: Resource = configurations[RESOURCE_ARG] # type: ignore if SAMPLING_TRACES_PER_SECOND_ARG in configurations: - sampling_traces_per_second = configurations[SAMPLING_TRACES_PER_SECOND_ARG] + traces_per_second = configurations[SAMPLING_TRACES_PER_SECOND_ARG] tracer_provider = TracerProvider( - sampler=RateLimitedSampler(sampling_ratio=cast(float, sampling_traces_per_second), resource=resource) + sampler=RateLimitedSampler(sampling_ratio=cast(float, traces_per_second), resource=resource) ) else: sampling_ratio = configurations[SAMPLING_RATIO_ARG] From 481874fef2a49136edb6b047f05affcb814f7f31 Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Tue, 8 Jul 2025 12:43:33 -0700 Subject: [PATCH 5/6] Fixed import error --- .../azure/monitor/opentelemetry/exporter/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py index feaa0fec8584..62576430d497 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py @@ -8,6 +8,7 @@ from azure.monitor.opentelemetry.exporter.export.metrics._exporter import AzureMonitorMetricExporter from azure.monitor.opentelemetry.exporter.export.trace._exporter import AzureMonitorTraceExporter from azure.monitor.opentelemetry.exporter.export.trace._sampling import ApplicationInsightsSampler +from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import RateLimitedSampler from ._version import VERSION __all__ = [ From 0a491dcb9c9630e0310bb3afec559e60ca234f10 Mon Sep 17 00:00:00 2001 From: Radhika Gupta Date: Tue, 8 Jul 2025 13:02:16 -0700 Subject: [PATCH 6/6] Fixed init import error --- .../azure/monitor/opentelemetry/exporter/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py index 62576430d497..c1788f1589aa 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py @@ -16,5 +16,6 @@ "AzureMonitorMetricExporter", "AzureMonitorLogExporter", "AzureMonitorTraceExporter", + "RateLimitedSampler", ] __version__ = VERSION