Skip to content

Commit 21e19b8

Browse files
committed
Added RateLimitedSampler
1 parent d850231 commit 21e19b8

File tree

8 files changed

+675
-22
lines changed

8 files changed

+675
-22
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,9 @@
218218
# sampleRate
219219

220220
_SAMPLE_RATE_KEY = "_MS.sampleRate"
221+
_HASH = 5381
222+
_INTEGER_MAX: int = 2**31 - 1
223+
_INTEGER_MIN: int = -2**31
221224

222225
# AAD Auth
223226

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
import math
5+
import threading
6+
import time
7+
from typing import Callable, Optional, Sequence
8+
from opentelemetry.context import Context
9+
from opentelemetry.trace import Link, SpanKind, format_trace_id
10+
from opentelemetry.sdk.trace.sampling import (
11+
Decision,
12+
Sampler,
13+
SamplingResult,
14+
_get_parent_trace_state,
15+
)
16+
from opentelemetry.trace.span import TraceState
17+
from opentelemetry.util.types import Attributes
18+
19+
from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY
20+
21+
from azure.monitor.opentelemetry.exporter.export.trace._utils import _get_djb2_sample_score, _round_down_to_nearest
22+
23+
class _State:
24+
def __init__(self, effective_window_count: float, effective_window_nanos: float, last_nano_time: int):
25+
self.effective_window_count = effective_window_count
26+
self.effective_window_nanos = effective_window_nanos
27+
self.last_nano_time = last_nano_time
28+
29+
class RateLimitedSamplingPercentage:
30+
def __init__(self, target_spans_per_second_limit: float,
31+
nano_time_supplier: Optional[Callable[[], int]] = None, round_to_nearest: bool = True):
32+
if target_spans_per_second_limit < 0.0:
33+
raise ValueError("Limit for sampled spans per second must be nonnegative!")
34+
self._nano_time_supplier = nano_time_supplier or (lambda: int(time.time_ns()))
35+
# Hardcoded adaptation time of 0.1 seconds for adjusting to sudden changes in telemetry volumes
36+
adaptation_time_seconds = 0.1
37+
self._inverse_adaptation_time_nanos = 1e-9 / adaptation_time_seconds
38+
self._target_spans_per_nanosecond_limit = 1e-9 * target_spans_per_second_limit
39+
initial_nano_time = self._nano_time_supplier()
40+
self._state = _State(0.0, 0.0, initial_nano_time)
41+
self._lock = threading.Lock()
42+
self._round_to_nearest = round_to_nearest
43+
44+
def _update_state(self, old_state: _State, current_nano_time: int) -> _State:
45+
if current_nano_time <= old_state.last_nano_time:
46+
return _State(
47+
old_state.effective_window_count + 1,
48+
old_state.effective_window_nanos,
49+
old_state.last_nano_time
50+
)
51+
nano_time_delta = current_nano_time - old_state.last_nano_time
52+
decay_factor = math.exp(-nano_time_delta * self._inverse_adaptation_time_nanos)
53+
current_effective_window_count = old_state.effective_window_count * decay_factor + 1
54+
current_effective_window_nanos = old_state.effective_window_nanos * decay_factor + nano_time_delta
55+
56+
return _State(current_effective_window_count, current_effective_window_nanos, current_nano_time)
57+
58+
def get(self) -> float:
59+
"""Get the current sampling percentage (0.0 to 100.0)."""
60+
current_nano_time = self._nano_time_supplier()
61+
62+
with self._lock:
63+
old_state = self._state
64+
self._state = self._update_state(self._state, current_nano_time)
65+
current_state = self._state
66+
67+
# Calculate sampling probability based on current state
68+
if current_state.effective_window_count == 0:
69+
return 100.0
70+
71+
sampling_probability = (
72+
(current_state.effective_window_nanos * self._target_spans_per_nanosecond_limit) /
73+
current_state.effective_window_count
74+
)
75+
76+
sampling_percentage = 100 * min(sampling_probability, 1.0)
77+
78+
if self._round_to_nearest:
79+
sampling_percentage = _round_down_to_nearest(sampling_percentage)
80+
81+
return sampling_percentage
82+
83+
84+
class RateLimitedSampler(Sampler):
85+
def __init__(self, target_spans_per_second_limit: float):
86+
self._sampling_percentage_generator = RateLimitedSamplingPercentage(target_spans_per_second_limit)
87+
self._description = f"RateLimitedSampler{{{target_spans_per_second_limit}}}"
88+
89+
def should_sample(
90+
self,
91+
parent_context: Optional[Context],
92+
trace_id: int,
93+
name: str,
94+
kind: Optional[SpanKind] = None,
95+
attributes: Attributes = None,
96+
links: Optional[Sequence["Link"]] = None,
97+
trace_state: Optional["TraceState"] = None,
98+
) -> "SamplingResult":
99+
sampling_percentage = self._sampling_percentage_generator.get()
100+
sampling_score = _get_djb2_sample_score(format_trace_id(trace_id).lower())
101+
102+
if sampling_score < sampling_percentage:
103+
decision = Decision.RECORD_AND_SAMPLE
104+
else:
105+
decision = Decision.DROP
106+
107+
if attributes is None:
108+
attributes = {}
109+
attributes[_SAMPLE_RATE_KEY] = sampling_percentage # type: ignore
110+
111+
return SamplingResult(
112+
decision,
113+
attributes,
114+
_get_parent_trace_state(parent_context),
115+
)
116+
117+
def get_description(self) -> str:
118+
return self._description

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from typing import no_type_check, Optional, Tuple
55
from urllib.parse import urlparse
6+
import math
67

78
from opentelemetry.semconv.attributes import (
89
client_attributes,
@@ -13,6 +14,11 @@
1314
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
1415
from opentelemetry.util.types import Attributes
1516

17+
from azure.monitor.opentelemetry.exporter._constants import (
18+
_HASH,
19+
_INTEGER_MAX,
20+
_INTEGER_MIN
21+
)
1622

1723
# pylint:disable=too-many-return-statements
1824
def _get_default_port_db(db_system: str) -> int:
@@ -320,3 +326,33 @@ def _get_url_for_http_request(attributes: Attributes) -> Optional[str]:
320326
http_target,
321327
)
322328
return url
329+
330+
def _get_djb2_sample_score(trace_id_hex: str) -> float:
331+
# This algorithm uses 32bit integers
332+
hash_value = _HASH
333+
for char in trace_id_hex:
334+
hash_value = ((hash_value << 5) + hash_value) + ord(char)
335+
hash_value &= 0xFFFFFFFF # simulate 32-bit integer overflow
336+
337+
# Convert to signed 32-bit int
338+
if hash_value & 0x80000000:
339+
hash_value = -((~hash_value & 0xFFFFFFFF) + 1)
340+
341+
if hash_value == _INTEGER_MIN:
342+
hash_value = int(_INTEGER_MAX)
343+
else:
344+
hash_value = abs(hash_value)
345+
346+
return 100.0 * (float(hash_value) / _INTEGER_MAX)
347+
348+
def _round_down_to_nearest(sampling_percentage: float) -> float:
349+
if sampling_percentage == 0:
350+
return 0
351+
# Handle extremely small percentages that would cause overflow
352+
if sampling_percentage <= _INTEGER_MIN: # Extremely small threshold
353+
return 0.0
354+
item_count = 100.0 / sampling_percentage
355+
# Handle case where item_count is infinity or too large for math.ceil
356+
if not math.isfinite(item_count) or item_count >= _INTEGER_MAX:
357+
return 0.0
358+
return 100.0 / math.ceil(item_count)

0 commit comments

Comments
 (0)