Skip to content

Commit c127567

Browse files
committed
Added changes for rate limited sampler azure-exporter
1 parent 27ed0de commit c127567

File tree

5 files changed

+690
-0
lines changed

5 files changed

+690
-0
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
from azure.monitor.opentelemetry.exporter.export.metrics._exporter import AzureMonitorMetricExporter
99
from azure.monitor.opentelemetry.exporter.export.trace._exporter import AzureMonitorTraceExporter
1010
from azure.monitor.opentelemetry.exporter.export.trace._sampling import ApplicationInsightsSampler
11+
from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import RateLimitedSampler
1112
from ._version import VERSION
1213

1314
__all__ = [
1415
"ApplicationInsightsSampler",
16+
"RateLimitedSampler",
1517
"AzureMonitorMetricExporter",
1618
"AzureMonitorLogExporter",
1719
"AzureMonitorTraceExporter",

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,9 @@ class _RP_Names(Enum):
287287
# sampleRate
288288

289289
_SAMPLE_RATE_KEY = "_MS.sampleRate"
290+
_HASH = 5381
291+
_INTEGER_MAX: int = 2**31 - 1
292+
_INTEGER_MIN: int = -2**31
290293

291294
# AAD Auth
292295

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
import math
5+
import threading
6+
import time
7+
from typing import Callable, Optional, Sequence
8+
from opentelemetry.context import Context
9+
from opentelemetry.trace import Link, SpanKind, format_trace_id, get_current_span
10+
from opentelemetry.sdk.trace.sampling import (
11+
Decision,
12+
Sampler,
13+
SamplingResult,
14+
_get_parent_trace_state,
15+
)
16+
from opentelemetry.trace.span import TraceState
17+
from opentelemetry.util.types import Attributes
18+
19+
from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY
20+
21+
from azure.monitor.opentelemetry.exporter.export.trace._utils import _get_djb2_sample_score, _round_down_to_nearest
22+
23+
class _State:
24+
def __init__(self, effective_window_count: float, effective_window_nanos: float, last_nano_time: int):
25+
self.effective_window_count = effective_window_count
26+
self.effective_window_nanos = effective_window_nanos
27+
self.last_nano_time = last_nano_time
28+
29+
class RateLimitedSamplingPercentage:
30+
def __init__(self, target_spans_per_second_limit: float,
31+
nano_time_supplier: Optional[Callable[[], int]] = None, round_to_nearest: bool = True):
32+
if target_spans_per_second_limit < 0.0:
33+
raise ValueError("Limit for sampled spans per second must be nonnegative!")
34+
self._nano_time_supplier = nano_time_supplier or (lambda: int(time.time_ns()))
35+
# Hardcoded adaptation time of 0.1 seconds for adjusting to sudden changes in telemetry volumes
36+
adaptation_time_seconds = 0.1
37+
self._inverse_adaptation_time_nanos = 1e-9 / adaptation_time_seconds
38+
self._target_spans_per_nanosecond_limit = 1e-9 * target_spans_per_second_limit
39+
initial_nano_time = self._nano_time_supplier()
40+
self._state = _State(0.0, 0.0, initial_nano_time)
41+
self._lock = threading.Lock()
42+
self._round_to_nearest = round_to_nearest
43+
44+
def _update_state(self, old_state: _State, current_nano_time: int) -> _State:
45+
if current_nano_time <= old_state.last_nano_time:
46+
return _State(
47+
old_state.effective_window_count + 1,
48+
old_state.effective_window_nanos,
49+
old_state.last_nano_time
50+
)
51+
nano_time_delta = current_nano_time - old_state.last_nano_time
52+
decay_factor = math.exp(-nano_time_delta * self._inverse_adaptation_time_nanos)
53+
current_effective_window_count = old_state.effective_window_count * decay_factor + 1
54+
current_effective_window_nanos = old_state.effective_window_nanos * decay_factor + nano_time_delta
55+
56+
return _State(current_effective_window_count, current_effective_window_nanos, current_nano_time)
57+
58+
def get(self) -> float:
59+
"""Get the current sampling percentage (0.0 to 100.0)."""
60+
current_nano_time = self._nano_time_supplier()
61+
62+
with self._lock:
63+
old_state = self._state
64+
self._state = self._update_state(self._state, current_nano_time)
65+
current_state = self._state
66+
67+
# Calculate sampling probability based on current state
68+
if current_state.effective_window_count == 0:
69+
return 100.0
70+
71+
sampling_probability = (
72+
(current_state.effective_window_nanos * self._target_spans_per_nanosecond_limit) /
73+
current_state.effective_window_count
74+
)
75+
76+
sampling_percentage = 100 * min(sampling_probability, 1.0)
77+
78+
if self._round_to_nearest:
79+
sampling_percentage = _round_down_to_nearest(sampling_percentage)
80+
81+
return sampling_percentage
82+
83+
84+
class RateLimitedSampler(Sampler):
85+
def __init__(self, target_spans_per_second_limit: float):
86+
self._sampling_percentage_generator = RateLimitedSamplingPercentage(target_spans_per_second_limit)
87+
self._description = f"RateLimitedSampler{{{target_spans_per_second_limit}}}"
88+
89+
def should_sample(
90+
self,
91+
parent_context: Optional[Context],
92+
trace_id: int,
93+
name: str,
94+
kind: Optional[SpanKind] = None,
95+
attributes: Attributes = None,
96+
links: Optional[Sequence["Link"]] = None,
97+
trace_state: Optional["TraceState"] = None,
98+
) -> "SamplingResult":
99+
100+
if parent_context is not None:
101+
parent_span = get_current_span(parent_context)
102+
parent_span_context = parent_span.get_span_context()
103+
104+
# Check if parent is valid and local (not remote)
105+
if parent_span_context.is_valid and not parent_span_context.is_remote:
106+
# Check if parent was dropped/record-only first
107+
if not parent_span.is_recording():
108+
# Parent was dropped, drop this child too
109+
if attributes is None:
110+
new_attributes = {}
111+
else:
112+
new_attributes = dict(attributes)
113+
new_attributes[_SAMPLE_RATE_KEY] = 0.0
114+
115+
return SamplingResult(
116+
Decision.DROP,
117+
new_attributes,
118+
_get_parent_trace_state(parent_context),
119+
)
120+
121+
# Parent is recording, check for sample rate attribute
122+
parent_attributes = getattr(parent_span, 'attributes', {})
123+
parent_sample_rate = parent_attributes.get(_SAMPLE_RATE_KEY)
124+
125+
if parent_sample_rate is not None:
126+
# Honor parent's sampling rate
127+
if attributes is None:
128+
new_attributes = {}
129+
else:
130+
new_attributes = dict(attributes)
131+
new_attributes[_SAMPLE_RATE_KEY] = parent_sample_rate
132+
133+
return SamplingResult(
134+
Decision.RECORD_AND_SAMPLE,
135+
new_attributes,
136+
_get_parent_trace_state(parent_context),
137+
)
138+
139+
sampling_percentage = self._sampling_percentage_generator.get()
140+
sampling_score = _get_djb2_sample_score(format_trace_id(trace_id).lower())
141+
142+
if sampling_score < sampling_percentage:
143+
decision = Decision.RECORD_AND_SAMPLE
144+
else:
145+
decision = Decision.DROP
146+
147+
if attributes is None:
148+
new_attributes = {}
149+
else:
150+
new_attributes = dict(attributes)
151+
new_attributes[_SAMPLE_RATE_KEY] = sampling_percentage
152+
153+
return SamplingResult(
154+
decision,
155+
new_attributes,
156+
_get_parent_trace_state(parent_context),
157+
)
158+
159+
def get_description(self) -> str:
160+
return self._description

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from typing import no_type_check, Optional, Tuple
55
from urllib.parse import urlparse
6+
import math
67

78
from opentelemetry.semconv.attributes import (
89
client_attributes,
@@ -13,6 +14,11 @@
1314
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
1415
from opentelemetry.util.types import Attributes
1516

17+
from azure.monitor.opentelemetry.exporter._constants import (
18+
_HASH,
19+
_INTEGER_MAX,
20+
_INTEGER_MIN
21+
)
1622

1723
# pylint:disable=too-many-return-statements
1824
def _get_default_port_db(db_system: str) -> int:
@@ -320,3 +326,33 @@ def _get_url_for_http_request(attributes: Attributes) -> Optional[str]:
320326
http_target,
321327
)
322328
return url
329+
330+
def _get_djb2_sample_score(trace_id_hex: str) -> float:
331+
# This algorithm uses 32bit integers
332+
hash_value = _HASH
333+
for char in trace_id_hex:
334+
hash_value = ((hash_value << 5) + hash_value) + ord(char)
335+
hash_value &= 0xFFFFFFFF # simulate 32-bit integer overflow
336+
337+
# Convert to signed 32-bit int
338+
if hash_value & 0x80000000:
339+
hash_value = -((~hash_value & 0xFFFFFFFF) + 1)
340+
341+
if hash_value == _INTEGER_MIN:
342+
hash_value = int(_INTEGER_MAX)
343+
else:
344+
hash_value = abs(hash_value)
345+
346+
return 100.0 * (float(hash_value) / _INTEGER_MAX)
347+
348+
def _round_down_to_nearest(sampling_percentage: float) -> float:
349+
if sampling_percentage == 0:
350+
return 0
351+
# Handle extremely small percentages that would cause overflow
352+
if sampling_percentage <= _INTEGER_MIN: # Extremely small threshold
353+
return 0.0
354+
item_count = 100.0 / sampling_percentage
355+
# Handle case where item_count is infinity or too large for math.ceil
356+
if not math.isfinite(item_count) or item_count >= _INTEGER_MAX:
357+
return 0.0
358+
return 100.0 / math.ceil(item_count)

0 commit comments

Comments
 (0)