6
6
7
7
import java .io .Serializable ;
8
8
import java .util .Arrays ;
9
- import java .util .concurrent .atomic .AtomicInteger ;
10
- import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
11
- import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
12
9
13
10
import static cn .myperf4j .base .util .UnsafeUtils .fieldOffset ;
14
- import static java .lang .Integer .MAX_VALUE ;
15
11
import static java .lang .Integer .MIN_VALUE ;
16
- import static java .util .concurrent .atomic .AtomicLongFieldUpdater .newUpdater ;
17
- import static java .util .concurrent .atomic .AtomicReferenceFieldUpdater .newUpdater ;
18
12
19
13
/**
20
14
* Created by LinShunkang on 2022/07/17
@@ -33,14 +27,16 @@ public class AtomicIntHashCounter implements IntHashCounter {
33
27
34
28
private static final long IHC_OFFSET = fieldOffset (AtomicIntHashCounter .class , "ihc" );
35
29
36
- private static final long VAL_0_OFFSET = fieldOffset (AtomicIntHashCounter .class , "val0 " );
30
+ private static final long SIZE_OFFSET = fieldOffset (AtomicIntHashCounter .class , "size " );
37
31
38
- private static final int MAX_CAPACITY = MAX_VALUE >> 1 ;
32
+ private static final long VAL_0_OFFSET = fieldOffset ( AtomicIntHashCounter . class , "val0" ) ;
39
33
40
34
private static final int MIN_LOG_SIZE = 4 ;
41
35
42
36
private static final int MAX_LOG_SIZE = 29 ;
43
37
38
+ private static final int MAX_CAPACITY = 1 << MAX_LOG_SIZE ;
39
+
44
40
private static final int MIN_SIZE = 1 << MIN_LOG_SIZE ; // Must be power of 2
45
41
46
42
private static final int RE_PROBE_LIMIT = 10 ;
@@ -59,7 +55,11 @@ public class AtomicIntHashCounter implements IntHashCounter {
59
55
60
56
private IHC ihc ;
61
57
62
- private int val0 ; // Value for Key: NO_KEY
58
+ // Size in active K,V pairs
59
+ private int size ;
60
+
61
+ // Value for Key: NO_KEY
62
+ private int val0 ;
63
63
64
64
public AtomicIntHashCounter () {
65
65
this (MIN_SIZE );
@@ -72,7 +72,8 @@ public AtomicIntHashCounter(int initialCapacity) {
72
72
throw new IllegalArgumentException ("Max initialCapacity need low than " + MAX_CAPACITY );
73
73
}
74
74
75
- this .ihc = new IHC (this , new AtomicInteger (), log2Size (initialCapacity ));
75
+ this .ihc = new IHC (this , log2Size (initialCapacity ));
76
+ this .size = 0 ;
76
77
this .val0 = 0 ;
77
78
}
78
79
@@ -100,6 +101,10 @@ private void helpCopy() {
100
101
topIhc .helpCopy (false );
101
102
}
102
103
104
+ private void incrementSize () {
105
+ UnsafeUtils .getAndAddInt (this , SIZE_OFFSET , 1 );
106
+ }
107
+
103
108
@ Override
104
109
public int get (int key ) {
105
110
if (key == NO_KEY ) {
@@ -143,11 +148,12 @@ public int addAndGet(int key, int delta) {
143
148
144
149
@ Override
145
150
public int size () {
146
- return (val0 == 0 ? 0 : 1 ) + ihc . size () ;
151
+ return (val0 == 0 ? 0 : 1 ) + size ;
147
152
}
148
153
149
154
@ Override
150
155
public void reset () {
156
+ this .size = 0 ;
151
157
this .val0 = 0 ;
152
158
finishCopy ().reset ();
153
159
}
@@ -197,27 +203,26 @@ private static final class IHC implements Serializable {
197
203
198
204
private static final long serialVersionUID = -156965978265678243L ;
199
205
200
- private static final AtomicReferenceFieldUpdater <IHC , IHC > NEXT_IHC_UPDATER =
201
- newUpdater (IHC .class , IHC .class , "nextIhc" );
206
+ private static final long SLOTS_OFFSET = fieldOffset (IHC .class , "slots" );
207
+
208
+ private static final long NEXT_IHC_OFFSET = fieldOffset (IHC .class , "nextIhc" );
202
209
203
- private static final AtomicLongFieldUpdater <IHC > RESIZE_THREADS_UPDATER =
204
- newUpdater (IHC .class , "resizeThreads" );
210
+ private static final long RESIZE_THREADS_OFFSET = fieldOffset (IHC .class , "resizeThreads" );
205
211
206
- private static final AtomicLongFieldUpdater < IHC > COPY_DONE_UPDATER = newUpdater (IHC .class , "copyDone" );
212
+ private static final long COPY_DONE_OFFSET = fieldOffset (IHC .class , "copyDone" );
207
213
208
- private static final AtomicLongFieldUpdater < IHC > COPY_IDX_UPDATER = newUpdater (IHC .class , "copyIdx" );
214
+ private static final long COPY_IDX_OFFSET = fieldOffset (IHC .class , "copyIdx" );
209
215
210
216
// Back-pointer to top-level structure
211
217
private final AtomicIntHashCounter aihc ;
212
218
213
- // Size in active K,V pairs
214
- private final AtomicInteger size ;
215
-
216
- private final AtomicInteger slots ;
219
+ // Count of used slots, to tell when table is full of dead unusable slots
220
+ private volatile int slots ;
217
221
218
222
private volatile IHC nextIhc ;
219
223
220
- private volatile long resizeThreads ; // Count of threads attempting an initial resize
224
+ // Count of threads attempting an initial resize
225
+ private volatile long resizeThreads ;
221
226
222
227
private volatile long copyDone ;
223
228
@@ -229,31 +234,25 @@ private static final class IHC implements Serializable {
229
234
230
235
private final int reProbeLimit ;
231
236
232
- IHC (AtomicIntHashCounter aihc , AtomicInteger size , int logSize ) {
237
+ IHC (AtomicIntHashCounter aihc , int logSize ) {
233
238
this .aihc = aihc ;
234
- this .size = size ;
235
- this .slots = new AtomicInteger (0 );
239
+ this .slots = 0 ;
236
240
this .kvs = new int [(1 << logSize ) << 1 ];
237
241
this .len = len (this .kvs );
238
242
this .reProbeLimit = reProbeLimit (this .len );
239
243
}
240
244
241
245
public void reset () {
242
- UNSAFE . setMemory ( kvs , byteOffset ( 0 ), (( long ) kvs . length ) * I_SCALE , ( byte ) 0 ) ;
246
+ this . slots = 0 ;
243
247
this .nextIhc = null ;
244
- this .size .set (0 );
245
- this .slots .set (0 );
246
248
this .resizeThreads = 0L ;
247
249
this .copyDone = 0L ;
248
250
this .copyIdx = 0L ;
249
- }
250
-
251
- public int size () {
252
- return size .get ();
251
+ UNSAFE .setMemory (kvs , byteOffset (0 ), ((long ) kvs .length ) * I_SCALE , (byte ) 0 );
253
252
}
254
253
255
254
private boolean casNextIhc (IHC newIhc ) {
256
- return NEXT_IHC_UPDATER . compareAndSet (this , null , newIhc );
255
+ return UNSAFE . compareAndSwapObject (this , NEXT_IHC_OFFSET , null , newIhc );
257
256
}
258
257
259
258
private boolean casKv (int idx , long oldKv , long newKv ) {
@@ -294,12 +293,7 @@ private int get(final int key) {
294
293
return nextIhc .get (key ); // Retry in the new table
295
294
}
296
295
297
- if (k == TOMB_PRIME ) {
298
- aihc .helpCopy ();
299
- return nextIhc .get (key ); // Retry in the new table
300
- }
301
-
302
- if (++reProbeTimes >= reProbeLimit ) {
296
+ if (++reProbeTimes >= reProbeLimit || k == TOMB_PRIME ) {
303
297
if (nextIhc != null ) { // Table copy in progress?
304
298
aihc .helpCopy ();
305
299
return nextIhc .get (key ); // Retry in the new table
@@ -323,10 +317,20 @@ private int addDelta(final int key, final int delta, final boolean fromTableCopy
323
317
k = key (kv );
324
318
v = value (kv );
325
319
if (k == NO_KEY ) { //No key!
320
+ assert v == 0 ;
326
321
if (casKv (idx , kv , kv (key , delta ))) {
327
- slots .addAndGet (1 );
328
322
if (!fromTableCopy ) {
329
- size .addAndGet (1 );
323
+ aihc .incrementSize ();
324
+ }
325
+
326
+ // See if we want to move to a new table (to avoid high average re-probe counts).
327
+ // We only check on the initial set of a Value from zero to not-zero
328
+ final int slots = UnsafeUtils .getAndAddInt (this , SLOTS_OFFSET , 1 );
329
+ if (slots >= (len >> 1 ) + (len >> 2 )) { // Table is full? slots > len * 3/4
330
+ resize (); // Force the new table copy to start
331
+ if (!fromTableCopy ) {
332
+ aihc .helpCopy (); // help along an existing copy
333
+ }
330
334
}
331
335
return v ;
332
336
}
@@ -347,16 +351,6 @@ private int addDelta(final int key, final int delta, final boolean fromTableCopy
347
351
return nextIhc .addDelta (key , delta , v != TOMB_PRIME );
348
352
}
349
353
350
- // See if we want to move to a new table (to avoid high average re-probe counts).
351
- // We only check on the initial set of a Value from zero to not-zero
352
- if (v == 0 && slots .get () >= len >> 1 ) {
353
- final IHC newIhc = resize (); // Force the new table copy to start
354
- if (!fromTableCopy ) {
355
- aihc .helpCopy (); // help along an existing copy
356
- }
357
- return newIhc .addDelta (key , delta , fromTableCopy );
358
- }
359
-
360
354
// Try to increase the value with delta
361
355
if (casValue (idx , v , v + delta )) {
362
356
return v ;
@@ -390,16 +384,13 @@ private IHC resize() {
390
384
// Prevent integer overflow - limit of 2^31 elements in a Java array.
391
385
// So here, 2^30 is the largest number of elements in the hash table
392
386
if (log2 > MAX_LOG_SIZE ) {
393
- throw new RuntimeException ("Table is full, size=" + size + ", newSize=" + newSize + ", log2=" + log2 );
387
+ throw new RuntimeException ("Table is full, size=" + aihc . size + ", newSize=" + newSize );
394
388
}
395
389
396
390
// Now limit the number of threads actually allocating memory to a
397
391
// handful - lest we have 750 threads all trying to allocate a giant
398
392
// resized array.
399
- long r = resizeThreads ;
400
- while (!RESIZE_THREADS_UPDATER .compareAndSet (this , r , r + 1 )) {
401
- r = resizeThreads ;
402
- }
393
+ final long r = UnsafeUtils .getAndAddLong (this , RESIZE_THREADS_OFFSET , 1 );
403
394
404
395
// Size calculation: 2 words (K+V) per table entry, plus a handful. We
405
396
// guess at 64-bit pointers; 32-bit pointers screws up the size calc by
@@ -428,7 +419,7 @@ private IHC resize() {
428
419
}
429
420
430
421
// New IHC - actually allocate the big arrays
431
- newIhc = new IHC (aihc , this . size , log2 );
422
+ newIhc = new IHC (aihc , log2 );
432
423
433
424
// Another check after the slow allocation
434
425
if (nextIhc != null ) { // See if resize is already in progress
@@ -461,11 +452,7 @@ private void helpCopy(final boolean copyAll) {
461
452
// algorithm) or do the copy work ourselves. Tiny tables with huge
462
453
// thread counts trying to copy the table often 'panic'.
463
454
if (panicStart == -1 ) { // No panic?
464
- copyIdx = (int ) this .copyIdx ;
465
- while (!COPY_IDX_UPDATER .compareAndSet (this , copyIdx , copyIdx + MIN_COPY_WORK )) {
466
- copyIdx = (int ) this .copyIdx ; // Re-read
467
- }
468
-
455
+ copyIdx = (int ) UnsafeUtils .getAndAddLong (this , COPY_IDX_OFFSET , MIN_COPY_WORK );
469
456
if (!(copyIdx < (oldLen << 1 ))) { // Panic!
470
457
panicStart = copyIdx ; // Record where we started to panic-copy
471
458
}
@@ -525,10 +512,8 @@ private void copyCheckAndPromote(int workDone) {
525
512
long copyDone = this .copyDone ;
526
513
assert (copyDone + workDone ) <= oldLen ;
527
514
if (workDone > 0 ) {
528
- while (!COPY_DONE_UPDATER .compareAndSet (this , copyDone , copyDone + workDone )) {
529
- copyDone = this .copyDone ; // Reload, retry
530
- assert (copyDone + workDone ) <= oldLen ;
531
- }
515
+ copyDone = UnsafeUtils .getAndAddLong (this , COPY_DONE_OFFSET , workDone );
516
+ assert (copyDone + workDone ) <= oldLen ;
532
517
}
533
518
534
519
// Check for copy being ALL done, and promote. Note that we might have
0 commit comments