6
6
#include < util/datetime/base.h>
7
7
8
8
template <class TTimer >
9
- class TLockFreeBucket {
9
+ class alignas ( 128 ) TLockFreeBucket { // align to cache line size
10
10
public:
11
11
TLockFreeBucket (std::atomic<i64 >& maxTokens, std::atomic<i64 >& minTokens, std::atomic<ui64>& inflowPerSecond)
12
12
: MaxTokens (maxTokens)
@@ -19,36 +19,47 @@ class TLockFreeBucket {
19
19
}
20
20
21
21
bool IsEmpty () {
22
- FillBucket ( );
23
- return Tokens.load () <= 0 ;
22
+ FillAndTake ( 0 );
23
+ return Tokens.load (std::memory_order_relaxed ) <= 0 ;
24
24
}
25
25
26
- void FillAndTake (i64 tokens) {
27
- FillBucket ( );
28
- TakeTokens (tokens );
29
- }
26
+ void FillAndTake (ui64 tokens) {
27
+ TTime prev = LastUpdate. load (std::memory_order_acquire );
28
+ TTime now = TTimer::Now ( );
29
+ i64 duration = TTimer::Duration (prev, now);
30
30
31
- private:
32
- void FillBucket () {
33
- TTime prev;
34
- TTime now;
35
- for (prev = LastUpdate.load (), now = TTimer::Now (); !LastUpdate.compare_exchange_strong (prev, now); ) {}
36
-
37
- ui64 rawInflow = InflowPerSecond.load () * TTimer::Duration (prev, now);
38
- if (rawInflow >= TTimer::Resolution) {
39
- Tokens.fetch_add (rawInflow / TTimer::Resolution);
40
- for (i64 tokens = Tokens.load (), maxTokens = MaxTokens.load (); tokens > maxTokens; ) {
41
- if (Tokens.compare_exchange_strong (tokens, maxTokens)) {
42
- break ;
43
- }
31
+ while (true ) {
32
+ if (prev >= now) {
33
+ duration = 0 ;
34
+ break ;
35
+ }
36
+
37
+ if (LastUpdate.compare_exchange_weak (prev, now,
38
+ std::memory_order_release,
39
+ std::memory_order_acquire)) {
40
+ break ;
44
41
}
45
42
}
46
- }
47
43
48
- void TakeTokens (i64 tokens) {
49
- Tokens.fetch_sub (tokens);
50
- for (i64 tokens = Tokens.load (), minTokens = MinTokens.load (); tokens < minTokens; ) {
51
- if (Tokens.compare_exchange_strong (tokens, minTokens)) {
44
+ i64 currentTokens = Tokens.load (std::memory_order_acquire);
45
+
46
+ ui64 rawInflow = InflowPerSecond.load (std::memory_order_relaxed) * duration;
47
+ i64 minTokens = MinTokens.load (std::memory_order_relaxed);
48
+ i64 maxTokens = MaxTokens.load (std::memory_order_relaxed);
49
+ Y_DEBUG_ABORT_UNLESS (minTokens <= maxTokens);
50
+
51
+ while (true ) {
52
+ i64 newTokens = currentTokens + rawInflow / TTimer::Resolution;
53
+ newTokens = std::min (newTokens, maxTokens);
54
+ newTokens = newTokens - tokens;
55
+ newTokens = std::max (newTokens, minTokens);
56
+
57
+ if (newTokens == currentTokens) {
58
+ break ;
59
+ }
60
+
61
+ if (Tokens.compare_exchange_weak (currentTokens, newTokens, std::memory_order_release,
62
+ std::memory_order_acquire)) {
52
63
break ;
53
64
}
54
65
}
@@ -63,4 +74,9 @@ class TLockFreeBucket {
63
74
64
75
std::atomic<i64 > Tokens;
65
76
std::atomic<TTime> LastUpdate;
77
+
78
+ constexpr static ui32 CacheLineFillerSize = 128 - sizeof (std::atomic<i64 >&) * 2 - sizeof (std::atomic<ui64>&)
79
+ - sizeof (std::atomic<i64 >) - sizeof (std::atomic<TTime>);
80
+
81
+ std::array<char , CacheLineFillerSize> CacheLineFiller;
66
82
};
0 commit comments