2
2
3
3
#include < atomic>
4
4
#include < cassert>
5
+ #include < cstddef>
5
6
#include < cstdint>
6
7
#include < memory>
8
+ #include < new>
7
9
#include < optional>
8
10
#include < type_traits>
9
11
#include < utility>
10
12
#include < vector>
11
13
12
- // This (standalone ) file implements the deque described in the papers, "Correct and Efficient
13
- // Work-Stealing for Weak Memory Models," and "Dynamic Circular Work-Stealing Deque". Both are avaliable
14
- // in 'reference/'.
14
+ // This (stand-alone ) file implements the deque described in the papers, "Correct and Efficient
15
+ // Work-Stealing for Weak Memory Models," and "Dynamic Circular Work-Stealing Deque". Both are
16
+ // available in 'reference/'.
15
17
16
18
namespace riften {
17
19
@@ -27,11 +29,15 @@ template <typename T> struct RingBuff {
27
29
28
30
std::int64_t capacity () const noexcept { return _cap; }
29
31
30
- // Relaxed store at modulo index
31
- void store (std::int64_t i, T x) noexcept { _buff[i & _mask].store (x, std::memory_order_relaxed); }
32
+ // Store at modulo index
33
+ void store (std::int64_t i, T&& x) noexcept requires std::is_nothrow_move_assignable_v<T> {
34
+ _buff[i & _mask] = std::move (x);
35
+ }
32
36
33
- // Relaxed load at modulo index
34
- T load (std::int64_t i) const noexcept { return _buff[i & _mask].load (std::memory_order_relaxed); }
37
+ // Load at modulo index
38
+ T load (std::int64_t i) const noexcept requires std::is_nothrow_move_constructible_v<T> {
39
+ return _buff[i & _mask];
40
+ }
35
41
36
42
// Allocates and returns a new ring buffer, copies elements in range [b, t) into the new buffer.
37
43
RingBuff<T>* resize (std::int64_t b, std::int64_t t) const {
@@ -44,34 +50,28 @@ template <typename T> struct RingBuff {
44
50
45
51
private:
46
52
std::int64_t _cap; // Capacity of the buffer
47
- std::int64_t _mask; // Bitmask to perform modulo capacity operations
53
+ std::int64_t _mask; // Bit mask to perform modulo capacity operations
48
54
49
- #if !__cpp_lib_smart_ptr_for_overwrite
50
- std::unique_ptr<std::atomic<T>[]> _buff = std::make_unique<std::atomic<T>[]>(_cap);
51
- #else
52
- std::unique_ptr<std::atomic<T>[]> _buff = std::make_unique_for_overwrite<std::atomic<T>[]>(_cap);
53
- #endif
55
+ std::unique_ptr<T[]> _buff = std::make_unique_for_overwrite<T[]>(_cap);
54
56
};
55
57
56
- template <typename T> struct is_always_lock_free {
57
- static constexpr bool value = std::atomic<T>::is_always_lock_free;
58
- };
58
+ } // namespace detail
59
59
60
- template <typename T> static constexpr bool lock_free_v
61
- = std::conjunction_v<std::is_trivially_copyable<T>,
62
- std::is_copy_constructible<T>,
63
- std::is_move_constructible<T>,
64
- std::is_copy_assignable<T>,
65
- std::is_move_assignable<T>,
66
- is_always_lock_free<T>>;
60
+ #ifdef __cpp_lib_hardware_interference_size
61
+ using std::hardware_destructive_interference_size;
62
+ #else
63
+ // 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
64
+ inline constexpr std::size_t hardware_destructive_interference_size = 2 * sizeof (std::max_align_t );
65
+ #endif
67
66
68
- } // namespace detail
67
+ template <typename T>
68
+ concept trivially_destructible = std::is_trivially_destructible_v<T>;
69
69
70
- // Lock-free single-producer multiple-consumer deque. There are no constraints on the type `T` that can
71
- // be stored. Only the deque owner can perform pop and push operations where the deque behaves like a
72
- // stack. Others can (only) steal data from the deque, they see a FIFO queue. All threads must have
73
- // finished using the deque before it is destructed .
74
- template <typename T> class Deque {
70
+ // Lock-free single-producer multiple-consumer deque. Only the deque owner can perform pop and push
71
+ // operations where the deque behaves like a stack. Others can (only) steal data from the deque, they see
72
+ // a FIFO queue. All threads must have finished using the deque before it is destructed. T must be
73
+ // trivially destructible and have nothrow move constructor/assignment operators .
74
+ template <trivially_destructible T> class Deque {
75
75
public:
76
76
// Constructs the deque with a given capacity the capacity of the deque (must be power of 2)
77
77
explicit Deque (std::int64_t cap = 1024 );
@@ -89,155 +89,133 @@ template <typename T> class Deque {
89
89
// Test if empty at instance of call
90
90
bool empty () const noexcept ;
91
91
92
- // Emplace an item to the deque. Only the owner thread can insert an item to the deque. The operation
93
- // can trigger the deque to resize its cap if more space is required. Provides the strong exception
94
- // garantee .
92
+ // Emplace an item to the deque. Only the owner thread can insert an item to the deque. The
93
+ // operation can trigger the deque to resize its cap if more space is required. Provides the
94
+ // strong exception guarantee .
95
95
template <typename ... Args> void emplace (Args&&... args);
96
96
97
- // Pops out an item from the deque. Only the owner thread can pop out an item from the deque. The
98
- // return can be a std::nullopt if this operation failed (empty deque).
97
+ // Pops out an item from the deque. Only the owner thread can pop out an item from the deque.
98
+ // The return can be a std::nullopt if this operation fails (empty deque).
99
99
std::optional<T> pop () noexcept ;
100
100
101
- // Steals an item from the deque Any threads can try to steal an item from the deque. The return can
102
- // be a std::nullopt if this operation failed (not necessary empty).
101
+ // Steals an item from the deque Any threads can try to steal an item from the deque. The return
102
+ // can be a std::nullopt if this operation failed (not necessarily empty).
103
103
std::optional<T> steal () noexcept ;
104
104
105
105
// Destruct the deque, all threads must have finished using the deque.
106
106
~Deque () noexcept ;
107
107
108
- // If true elements of type `T` are stored directly in the ring buffer.
109
- static constexpr bool no_alloc = std::is_trivially_destructible_v<T> && detail::lock_free_v<T>;
110
-
111
108
private:
112
- using buffer_t = detail::RingBuff<std::conditional_t <no_alloc, T, T*>>;
109
+ alignas (hardware_destructive_interference_size) std::atomic<std::int64_t > _top;
110
+ alignas (hardware_destructive_interference_size) std::atomic<std::int64_t > _bottom;
111
+ alignas (hardware_destructive_interference_size) std::atomic<detail::RingBuff<T>*> _buffer;
113
112
114
- std::atomic<std::int64_t > _top; // Top of deque
115
- std::atomic<std::int64_t > _bottom; // Bottom of deque.
116
- std::atomic<buffer_t *> _buffer; // Current buffer.
117
- std::vector<std::unique_ptr<buffer_t >> _garbage; // Store old buffers here.
113
+ std::vector<std::unique_ptr<detail::RingBuff<T>>> _garbage; // Store old buffers here.
118
114
119
- // Convinience aliases.
115
+ // Convenience aliases.
120
116
static constexpr std::memory_order relaxed = std::memory_order_relaxed;
121
117
static constexpr std::memory_order consume = std::memory_order_consume;
122
118
static constexpr std::memory_order acquire = std::memory_order_acquire;
123
119
static constexpr std::memory_order release = std::memory_order_release;
124
120
static constexpr std::memory_order seq_cst = std::memory_order_seq_cst;
125
121
};
126
122
127
- template <typename T> Deque<T>::Deque(std::int64_t cap)
128
- : _top(0 ), _bottom(0 ), _buffer(new buffer_t {cap}) {
123
+ template <trivially_destructible T> Deque<T>::Deque(std::int64_t cap)
124
+ : _top(0 ), _bottom(0 ), _buffer(new detail::RingBuff<T> {cap}) {
129
125
_garbage.reserve (32 );
130
126
}
131
127
132
- template <typename T> std::size_t Deque<T>::size() const noexcept {
128
+ template <trivially_destructible T> std::size_t Deque<T>::size() const noexcept {
133
129
int64_t b = _bottom.load (relaxed);
134
130
int64_t t = _top.load (relaxed);
135
131
return static_cast <std::size_t >(b >= t ? b - t : 0 );
136
132
}
137
133
138
- template <typename T> int64_t Deque<T>::capacity() const noexcept {
134
+ template <trivially_destructible T> int64_t Deque<T>::capacity() const noexcept {
139
135
return _buffer.load (relaxed)->capacity ();
140
136
}
141
137
142
- template <typename T> bool Deque<T>::empty() const noexcept { return !size (); }
138
+ template <trivially_destructible T> bool Deque<T>::empty() const noexcept { return !size (); }
139
+
140
+ template <trivially_destructible T> template <typename ... Args> void Deque<T>::emplace(Args&&... args) {
141
+ // Construct before acquiring slot in-case constructor throws
142
+ T object (std::forward<Args>(args)...);
143
143
144
- template <typename T> template <typename ... Args> void Deque<T>::emplace(Args&&... args) {
145
144
std::int64_t b = _bottom.load (relaxed);
146
145
std::int64_t t = _top.load (acquire);
147
- buffer_t * buf = _buffer.load (relaxed);
146
+ detail::RingBuff<T> * buf = _buffer.load (relaxed);
148
147
149
148
if (buf->capacity () < (b - t) + 1 ) {
150
149
// Queue is full, build a new one
151
150
_garbage.emplace_back (std::exchange (buf, buf->resize (b, t)));
152
151
_buffer.store (buf, relaxed);
153
152
}
154
153
155
- // Construct new object
156
- if constexpr (no_alloc) {
157
- buf->store (b, {std::forward<Args>(args)...});
158
- } else {
159
- buf->store (b, new T{std::forward<Args>(args)...});
160
- }
154
+ // Construct new object, this does not have to be atomic as no one can steal this item until after we
155
+ // store the new value of bottom, ordering is maintained by surrounding atomics.
156
+ buf->store (b, std::move (object));
161
157
162
158
std::atomic_thread_fence (release);
163
159
_bottom.store (b + 1 , relaxed);
164
160
}
165
161
166
- template <typename T> std::optional<T> Deque<T>::pop() noexcept {
162
+ template <trivially_destructible T> std::optional<T> Deque<T>::pop() noexcept {
167
163
std::int64_t b = _bottom.load (relaxed) - 1 ;
168
- buffer_t * buf = _buffer.load (relaxed);
169
- _bottom.store (b, relaxed);
164
+ detail::RingBuff<T>* buf = _buffer.load (relaxed);
165
+
166
+ _bottom.store (b, relaxed); // Stealers can no longer steal
167
+
170
168
std::atomic_thread_fence (seq_cst);
171
169
std::int64_t t = _top.load (relaxed);
172
170
173
171
if (t <= b) {
174
172
// Non-empty deque
175
173
if (t == b) {
176
- // The last item could get stolen
174
+ // The last item could get stolen, by a stealer that loaded bottom before our write above
177
175
if (!_top.compare_exchange_strong (t, t + 1 , seq_cst, relaxed)) {
178
- // Failed race.
176
+ // Failed race, thief got the last item .
179
177
_bottom.store (b + 1 , relaxed);
180
178
return std::nullopt;
181
179
}
182
180
_bottom.store (b + 1 , relaxed);
183
181
}
184
182
185
- // Can delay load until after aquiring slot as only this thread can push()
186
- auto x = buf->load (b);
187
-
188
- if constexpr (no_alloc) {
189
- return x;
190
- } else {
191
- std::optional tmp{std::move (*x)};
192
- delete x;
193
- return tmp;
194
- }
183
+ // Can delay load until after acquiring slot as only this thread can push(), this load is not
184
+ // required to be atomic as we are the exclusive writer.
185
+ return buf->load (b);
195
186
196
187
} else {
197
188
_bottom.store (b + 1 , relaxed);
198
189
return std::nullopt;
199
190
}
200
191
}
201
192
202
- template <typename T> std::optional<T> Deque<T>::steal() noexcept {
193
+ template <trivially_destructible T> std::optional<T> Deque<T>::steal() noexcept {
203
194
std::int64_t t = _top.load (acquire);
204
195
std::atomic_thread_fence (seq_cst);
205
196
std::int64_t b = _bottom.load (acquire);
206
197
207
198
if (t < b) {
208
- // Must load *before* aquiring the slot as slot may be overwritten immidiatly after aquiring.
209
- auto x = _buffer.load (consume)->load (t);
199
+ // Must load *before* acquiring the slot as slot may be overwritten immediately after acquiring.
200
+ // This load is NOT required to be atomic even-though it may race with an overrite as we only
201
+ // return the value if we win the race below garanteeing we had no race during our read. If we
202
+ // loose the race then 'x' could be corrupt due to read-during-write race but as T is trivially
203
+ // destructible this does not matter.
204
+ T x = _buffer.load (consume)->load (t);
210
205
211
206
if (!_top.compare_exchange_strong (t, t + 1 , seq_cst, relaxed)) {
212
207
// Failed race.
213
208
return std::nullopt;
214
209
}
215
210
216
- if constexpr (no_alloc) {
217
- return x;
218
- } else {
219
- std::optional tmp{std::move (*x)};
220
- delete x;
221
- return tmp;
222
- }
211
+ return x;
223
212
224
213
} else {
225
214
// Empty deque.
226
215
return std::nullopt;
227
216
}
228
217
}
229
218
230
- template <typename T> Deque<T>::~Deque () noexcept {
231
- if constexpr (!no_alloc) {
232
- // Clean up all remaining items in the deque.
233
- while (!empty ()) {
234
- pop ();
235
- }
236
-
237
- assert (empty () && " Busy during destruction" ); // Check for interupts.
238
- }
239
-
240
- delete _buffer.load ();
241
- }
219
+ template <trivially_destructible T> Deque<T>::~Deque () noexcept { delete _buffer.load (); }
242
220
243
- } // namespace riften
221
+ } // namespace riften
0 commit comments