35
35
import java .util .concurrent .ConcurrentLinkedQueue ;
36
36
import java .util .concurrent .ConcurrentMap ;
37
37
import java .util .concurrent .atomic .AtomicLong ;
38
+ import java .util .concurrent .atomic .AtomicLongArray ;
38
39
import java .util .concurrent .atomic .AtomicReference ;
40
+ import java .util .concurrent .atomic .AtomicReferenceArray ;
39
41
import java .util .concurrent .locks .Lock ;
40
42
import java .util .concurrent .locks .ReentrantLock ;
41
43
@@ -132,14 +134,20 @@ public final class PrivateMaxEntriesMap<K, V> extends AbstractMap<K, V>
132
134
/** The maximum capacity of the map. */
133
135
static final long MAXIMUM_CAPACITY = Long .MAX_VALUE - Integer .MAX_VALUE ;
134
136
135
- /** The number of read buffers to use. */
136
- static final int NUMBER_OF_READ_BUFFERS = ceilingNextPowerOfTwo (NCPU );
137
+ /**
138
+ * The number of read buffers to use.
139
+ * The max of 4 was introduced due to https://github.com/FasterXML/jackson-databind/issues/3665.
140
+ */
141
+ static final int NUMBER_OF_READ_BUFFERS = Math .min (4 , ceilingNextPowerOfTwo (NCPU ));
137
142
138
143
/** Mask value for indexing into the read buffers. */
139
144
static final int READ_BUFFERS_MASK = NUMBER_OF_READ_BUFFERS - 1 ;
140
145
141
- /** The number of pending read operations before attempting to drain. */
142
- static final int READ_BUFFER_THRESHOLD = 32 ;
146
+ /**
147
+ * The number of pending read operations before attempting to drain.
148
+ * The threshold of 4 was introduced due to https://github.com/FasterXML/jackson-databind/issues/3665.
149
+ */
150
+ static final int READ_BUFFER_THRESHOLD = 4 ;
143
151
144
152
/** The maximum number of read operations to perform per amortized drain. */
145
153
static final int READ_BUFFER_DRAIN_THRESHOLD = 2 * READ_BUFFER_THRESHOLD ;
@@ -175,16 +183,20 @@ static int ceilingNextPowerOfTwo(int x) {
175
183
176
184
final Lock evictionLock ;
177
185
final Queue <Runnable > writeBuffer ;
178
- final AtomicLong [] readBufferWriteCount ;
179
- final AtomicLong [] readBufferDrainAtWriteCount ;
180
- final AtomicReference <Node <K , V >>[][] readBuffers ;
186
+ final AtomicLongArray readBufferWriteCount ;
187
+ final AtomicLongArray readBufferDrainAtWriteCount ;
188
+ final AtomicReferenceArray <Node <K , V >> readBuffers ;
181
189
182
190
final AtomicReference <DrainStatus > drainStatus ;
183
191
184
192
transient Set <K > keySet ;
185
193
transient Collection <V > values ;
186
194
transient Set <Entry <K , V >> entrySet ;
187
195
196
+ private static int readBufferIndex (int bufferIndex , int entryIndex ) {
197
+ return READ_BUFFER_SIZE * bufferIndex + entryIndex ;
198
+ }
199
+
188
200
/**
189
201
* Creates an instance based on the builder's configuration.
190
202
*/
@@ -203,17 +215,9 @@ private PrivateMaxEntriesMap(Builder<K, V> builder) {
203
215
drainStatus = new AtomicReference <DrainStatus >(IDLE );
204
216
205
217
readBufferReadCount = new long [NUMBER_OF_READ_BUFFERS ];
206
- readBufferWriteCount = new AtomicLong [NUMBER_OF_READ_BUFFERS ];
207
- readBufferDrainAtWriteCount = new AtomicLong [NUMBER_OF_READ_BUFFERS ];
208
- readBuffers = new AtomicReference [NUMBER_OF_READ_BUFFERS ][READ_BUFFER_SIZE ];
209
- for (int i = 0 ; i < NUMBER_OF_READ_BUFFERS ; i ++) {
210
- readBufferWriteCount [i ] = new AtomicLong ();
211
- readBufferDrainAtWriteCount [i ] = new AtomicLong ();
212
- readBuffers [i ] = new AtomicReference [READ_BUFFER_SIZE ];
213
- for (int j = 0 ; j < READ_BUFFER_SIZE ; j ++) {
214
- readBuffers [i ][j ] = new AtomicReference <Node <K , V >>();
215
- }
216
- }
218
+ readBufferWriteCount = new AtomicLongArray (NUMBER_OF_READ_BUFFERS );
219
+ readBufferDrainAtWriteCount = new AtomicLongArray (NUMBER_OF_READ_BUFFERS );
220
+ readBuffers = new AtomicReferenceArray <>(NUMBER_OF_READ_BUFFERS * READ_BUFFER_SIZE );
217
221
}
218
222
219
223
/** Ensures that the object is not null. */
@@ -330,12 +334,11 @@ long recordRead(int bufferIndex, Node<K, V> node) {
330
334
// The location in the buffer is chosen in a racy fashion as the increment
331
335
// is not atomic with the insertion. This means that concurrent reads can
332
336
// overlap and overwrite one another, resulting in a lossy buffer.
333
- final AtomicLong counter = readBufferWriteCount [bufferIndex ];
334
- final long writeCount = counter .get ();
335
- counter .lazySet (writeCount + 1 );
337
+ final long writeCount = readBufferWriteCount .get (bufferIndex );
338
+ readBufferWriteCount .lazySet (bufferIndex , writeCount + 1 );
336
339
337
340
final int index = (int ) (writeCount & READ_BUFFER_INDEX_MASK );
338
- readBuffers [ bufferIndex ][ index ] .lazySet (node );
341
+ readBuffers .lazySet (readBufferIndex ( bufferIndex , index ), node );
339
342
340
343
return writeCount ;
341
344
}
@@ -348,7 +351,7 @@ long recordRead(int bufferIndex, Node<K, V> node) {
348
351
* @param writeCount the number of writes on the chosen read buffer
349
352
*/
350
353
void drainOnReadIfNeeded (int bufferIndex , long writeCount ) {
351
- final long pending = (writeCount - readBufferDrainAtWriteCount [ bufferIndex ] .get ());
354
+ final long pending = (writeCount - readBufferDrainAtWriteCount .get (bufferIndex ));
352
355
final boolean delayable = (pending < READ_BUFFER_THRESHOLD );
353
356
final DrainStatus status = drainStatus .get ();
354
357
if (status .shouldDrainBuffers (delayable )) {
@@ -403,20 +406,20 @@ void drainReadBuffers() {
403
406
/** Drains the read buffer up to an amortized threshold. */
404
407
//@GuardedBy("evictionLock")
405
408
void drainReadBuffer (int bufferIndex ) {
406
- final long writeCount = readBufferWriteCount [ bufferIndex ] .get ();
409
+ final long writeCount = readBufferWriteCount .get (bufferIndex );
407
410
for (int i = 0 ; i < READ_BUFFER_DRAIN_THRESHOLD ; i ++) {
408
411
final int index = (int ) (readBufferReadCount [bufferIndex ] & READ_BUFFER_INDEX_MASK );
409
- final AtomicReference < Node < K , V >> slot = readBuffers [ bufferIndex ][ index ] ;
410
- final Node <K , V > node = slot .get ();
412
+ final int arrayIndex = readBufferIndex ( bufferIndex , index ) ;
413
+ final Node <K , V > node = readBuffers .get (arrayIndex );
411
414
if (node == null ) {
412
415
break ;
413
416
}
414
417
415
- slot .lazySet (null );
418
+ readBuffers .lazySet (arrayIndex , null );
416
419
applyRead (node );
417
420
readBufferReadCount [bufferIndex ]++;
418
421
}
419
- readBufferDrainAtWriteCount [ bufferIndex ] .lazySet (writeCount );
422
+ readBufferDrainAtWriteCount .lazySet (bufferIndex , writeCount );
420
423
}
421
424
422
425
/** Updates the node's location in the page replacement policy. */
@@ -579,10 +582,8 @@ public void clear() {
579
582
}
580
583
581
584
// Discard all pending reads
582
- for (AtomicReference <Node <K , V >>[] buffer : readBuffers ) {
583
- for (AtomicReference <Node <K , V >> slot : buffer ) {
584
- slot .lazySet (null );
585
- }
585
+ for (int i = 0 ; i < readBuffers .length (); i ++) {
586
+ readBuffers .lazySet (i , null );
586
587
}
587
588
588
589
// Apply all pending writes
0 commit comments