1
+ #pragma once
2
+
3
+ #include " Iyp/WaitFreeRingBufferUtilities/optional-type.inl"
4
+ #include " Iyp/WaitFreeRingBufferUtilities/details/cache-aligned-and-padded-object.inl"
5
+
6
+ #include < cstdint>
7
+ #include < utility>
8
+ #include < limits>
9
+ #include < cstddef>
10
+ #include < atomic>
11
+ #include < array>
12
+
13
+ namespace Iyp
14
+ {
15
+ namespace WaitFreeRingBufferUtilities
16
+ {
17
+ namespace Details
18
+ {
19
+ template <typename T, typename ... ElementFeatures>
20
+ struct Element : ElementFeatures...
21
+ {
22
+ using ElementType = T;
23
+
24
+ std::atomic<T *> value_ptr;
25
+ typename std::aligned_storage<sizeof (T), alignof (T)>::type storage;
26
+
27
+ Element () : value_ptr(nullptr )
28
+ {
29
+ }
30
+
31
+ Element (const Element &) = delete ;
32
+ Element (Element &&) = delete ;
33
+
34
+ Element &operator =(const Element &) = delete ;
35
+ Element &operator =(Element &&) = delete ;
36
+
37
+ ~Element ()
38
+ {
39
+ const auto local_value_ptr = value_ptr.load (std::memory_order_relaxed);
40
+ if (local_value_ptr)
41
+ local_value_ptr->~T ();
42
+ }
43
+ };
44
+
45
+ template <typename Element, std::size_t Count>
46
+ struct RingBufferStateBase
47
+ {
48
+ // protected:
49
+ using ElementType = typename Element::ElementType;
50
+
51
+ enum : std::size_t
52
+ {
53
+ COUNT = Count,
54
+ COUNT_MASK = Count - 1 ,
55
+ };
56
+
57
+ std::array<CacheAlignedAndPaddedObject<Element>, Count> elements{};
58
+
59
+ public:
60
+ static_assert (Count > 0 && !(COUNT_MASK & Count), " Count should be a power of two." );
61
+ };
62
+
63
+ namespace Private
64
+ {
65
+ template <std::size_t Count>
66
+ struct CountInt64CompatibilityCheck
67
+ {
68
+ static_assert (Count <= static_cast <std::size_t >(std::numeric_limits<std::int64_t >::max()), " Count exceeds the maximum. Count should fit in a std::int64_t." );
69
+ };
70
+ } // namespace Private
71
+
72
+ template <typename ProducerTypeTraits, typename ConsumerTypeTraits, typename T, std::size_t Count>
73
+ struct RingBufferTypeConstructor : ProducerTypeTraits::template Behavior<
74
+ typename ConsumerTypeTraits::template Behavior<
75
+ typename ProducerTypeTraits::template SharedState<
76
+ typename ConsumerTypeTraits::template SharedState<
77
+ RingBufferStateBase<
78
+ Element<T, typename ProducerTypeTraits::ElementFeature, typename ConsumerTypeTraits::ElementFeature>,
79
+ Count>>>>>
80
+ {
81
+ };
82
+
83
+ struct MultiProducerTypeTraits
84
+ {
85
+ template <typename BaseType>
86
+ struct Behavior : BaseType
87
+ {
88
+ CacheAlignedAndPaddedObject<std::atomic_size_t > end{std::size_t (0 )};
89
+
90
+ public:
91
+ using ElementType = typename BaseType::ElementType;
92
+
93
+ template <typename ... Args>
94
+ bool push (Args &&... args)
95
+ {
96
+ if (BaseType::push_task_count.fetch_sub (1 , std::memory_order_acquire) <= std::int64_t (0 ))
97
+ {
98
+ BaseType::push_task_count.fetch_add (1 , std::memory_order_relaxed);
99
+ return false ;
100
+ }
101
+
102
+ while (true )
103
+ {
104
+ const std::size_t local_end = end.fetch_add (1 , std::memory_order_acquire);
105
+ const std::size_t element_index = local_end & BaseType::COUNT_MASK;
106
+ bool expected_is_pusher_processing = false ;
107
+ if (std::atomic_compare_exchange_strong (&BaseType::elements[element_index].is_pusher_processing , &expected_is_pusher_processing, true ))
108
+ {
109
+ if (BaseType::elements[element_index].value_ptr .load (std::memory_order_acquire))
110
+ {
111
+ BaseType::elements[element_index].is_pusher_processing .store (false , std::memory_order_relaxed);
112
+ continue ;
113
+ }
114
+
115
+ BaseType::elements[element_index].value_ptr .store (new (&BaseType::elements[element_index].storage ) ElementType (std::forward<Args>(args)...),
116
+ std::memory_order_release);
117
+ BaseType::pop_task_count.fetch_add (1 , std::memory_order_release);
118
+
119
+ BaseType::elements[element_index].is_pusher_processing .store (false , std::memory_order_release);
120
+ return true ;
121
+ }
122
+ }
123
+ }
124
+ };
125
+
126
+ template <typename BaseType, typename = Private::CountInt64CompatibilityCheck<BaseType::COUNT>>
127
+ struct SharedState : BaseType
128
+ {
129
+ CacheAlignedAndPaddedObject<std::atomic<std::int64_t >> push_task_count{static_cast <std::int64_t >(BaseType::COUNT)};
130
+ };
131
+
132
+ struct ElementFeature
133
+ {
134
+ CacheAlignedAndPaddedObject<std::atomic_bool> is_pusher_processing{false };
135
+ };
136
+ };
137
+
138
+ struct MultiConsumerTypeTraits
139
+ {
140
+ template <typename BaseType>
141
+ struct Behavior : BaseType
142
+ {
143
+ private:
144
+ CacheAlignedAndPaddedObject<std::atomic_size_t > begin{std::size_t (0 )};
145
+
146
+ public:
147
+ using ElementType = typename BaseType::ElementType;
148
+
149
+ OptionalType<ElementType> pop ()
150
+ {
151
+ if (BaseType::pop_task_count.fetch_sub (1 , std::memory_order_acquire) <= std::int64_t (0 ))
152
+ {
153
+ BaseType::pop_task_count.fetch_add (1 , std::memory_order_relaxed);
154
+ return OptionalType<ElementType>{};
155
+ }
156
+
157
+ while (true )
158
+ {
159
+ const std::size_t local_begin = begin.fetch_add (1 , std::memory_order_acquire);
160
+
161
+ const std::size_t element_index = local_begin & BaseType::COUNT_MASK;
162
+ bool expected_is_popper_processing = false ;
163
+ if (std::atomic_compare_exchange_strong (&BaseType::elements[element_index].is_popper_processing , &expected_is_popper_processing, true ))
164
+ {
165
+ const auto value_ptr = BaseType::elements[element_index].value_ptr .load (std::memory_order_acquire);
166
+ if (!value_ptr)
167
+ {
168
+ BaseType::elements[element_index].is_popper_processing .store (false , std::memory_order_relaxed);
169
+ continue ;
170
+ }
171
+
172
+ OptionalType<ElementType> result{std::move (*value_ptr)};
173
+ value_ptr->~ElementType ();
174
+
175
+ BaseType::elements[element_index].value_ptr .store (nullptr , std::memory_order_release);
176
+ BaseType::push_task_count.fetch_add (1 , std::memory_order_release);
177
+
178
+ BaseType::elements[element_index].is_popper_processing .store (false , std::memory_order_release);
179
+ return result;
180
+ }
181
+ }
182
+ }
183
+ };
184
+
185
+ template <typename BaseType, typename = Private::CountInt64CompatibilityCheck<BaseType::COUNT>>
186
+ struct SharedState : BaseType
187
+ {
188
+ CacheAlignedAndPaddedObject<std::atomic<std::int64_t >> pop_task_count{std::int64_t {0 }};
189
+ };
190
+
191
+ struct ElementFeature
192
+ {
193
+ CacheAlignedAndPaddedObject<std::atomic_bool> is_popper_processing{false };
194
+ };
195
+ };
196
+
197
+ struct SingleProducerTypeTraits
198
+ {
199
+ template <typename BaseType>
200
+ struct Behavior : BaseType
201
+ {
202
+ private:
203
+ struct State
204
+ {
205
+ std::size_t pushed_task_count{0 };
206
+ std::size_t end{0 };
207
+ };
208
+
209
+ CacheAlignedAndPaddedObject<State> state{};
210
+
211
+ public:
212
+ using ElementType = typename BaseType::ElementType;
213
+
214
+ template <typename ... Args>
215
+ bool push (Args &&... args)
216
+ {
217
+ if (BaseType::push_task_count.load (std::memory_order_acquire) == state.pushed_task_count )
218
+ return false ;
219
+
220
+ state.pushed_task_count ++;
221
+
222
+ while (true )
223
+ {
224
+ const std::size_t element_index = (state.end ++) & BaseType::COUNT_MASK;
225
+
226
+ if (!BaseType::elements[element_index].value_ptr .load (std::memory_order_acquire))
227
+ {
228
+ BaseType::elements[element_index].value_ptr .store (new (&BaseType::elements[element_index].storage ) ElementType (std::forward<Args>(args)...),
229
+ std::memory_order_release);
230
+ BaseType::pop_task_count.fetch_add (1 , std::memory_order_release);
231
+
232
+ return true ;
233
+ }
234
+ }
235
+ }
236
+ };
237
+
238
+ template <typename BaseType>
239
+ struct SharedState : BaseType
240
+ {
241
+ CacheAlignedAndPaddedObject<std::atomic_size_t > push_task_count{BaseType::COUNT};
242
+ };
243
+
244
+ struct ElementFeature
245
+ {
246
+ };
247
+ };
248
+
249
+ struct SingleConsumerTypeTraits
250
+ {
251
+
252
+ template <typename BaseType>
253
+ struct Behavior : BaseType
254
+ {
255
+ private:
256
+ struct State
257
+ {
258
+ std::size_t popped_task_count{0 };
259
+ std::size_t begin{0 };
260
+ };
261
+
262
+ CacheAlignedAndPaddedObject<State> state{};
263
+
264
+ public:
265
+ using ElementType = typename BaseType::ElementType;
266
+
267
+ OptionalType<ElementType> pop ()
268
+ {
269
+ if (BaseType::pop_task_count.load (std::memory_order_acquire) == state.popped_task_count )
270
+ return OptionalType<ElementType>{};
271
+
272
+ state.popped_task_count ++;
273
+
274
+ while (true )
275
+ {
276
+ const std::size_t element_index = (state.begin ++) & BaseType::COUNT_MASK;
277
+
278
+ const auto value_ptr = BaseType::elements[element_index].value_ptr .load (std::memory_order_acquire);
279
+ if (value_ptr)
280
+ {
281
+ OptionalType<ElementType> result{std::move (*value_ptr)};
282
+ value_ptr->~ElementType ();
283
+
284
+ BaseType::elements[element_index].value_ptr .store (nullptr , std::memory_order_release);
285
+ BaseType::push_task_count.fetch_add (1 , std::memory_order_release);
286
+
287
+ return result;
288
+ }
289
+ }
290
+ }
291
+ };
292
+
293
+ template <typename BaseType>
294
+ struct SharedState : BaseType
295
+ {
296
+ CacheAlignedAndPaddedObject<std::atomic_size_t > pop_task_count{std::size_t {0 }};
297
+ };
298
+
299
+ struct ElementFeature
300
+ {
301
+ };
302
+ };
303
+
304
+ } // namespace Details
305
+ } // namespace WaitFreeRingBufferUtilities
306
+ } // namespace Iyp
0 commit comments