Skip to content

Commit 60640d6

Browse files
authored
Fix #400 (#406)
1 parent 5bffe8b commit 60640d6

File tree

3 files changed

+214
-5
lines changed

3 files changed

+214
-5
lines changed

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/ApacheCodecRecycler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@
1717
public final class ApacheCodecRecycler
1818
implements WithPool<ApacheCodecRecycler>
1919
{
20+
// NOTE: AtomicReference only needed for ThreadLocal recycling where
21+
// single-thread access is not (ironically enough) ensured
2022
private final AtomicReference<BinaryDecoder> decoderRef = new AtomicReference<>();
23+
24+
// NOTE: AtomicReference only needed for ThreadLocal recycling where
25+
// single-thread access is not (ironically enough) ensured
2126
private final AtomicReference<BinaryEncoder> encoderRef = new AtomicReference<>();
2227

2328
private RecyclerPool<ApacheCodecRecycler> _pool;

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/AvroRecyclerPools.java

Lines changed: 205 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package com.fasterxml.jackson.dataformat.avro.apacheimpl;
22

33
import java.lang.ref.SoftReference;
4+
import java.util.concurrent.ArrayBlockingQueue;
5+
import java.util.concurrent.ConcurrentLinkedDeque;
46

5-
import com.fasterxml.jackson.core.util.BufferRecycler;
67
import com.fasterxml.jackson.core.util.RecyclerPool;
8+
import com.fasterxml.jackson.core.util.RecyclerPool.BoundedPoolBase;
9+
import com.fasterxml.jackson.core.util.RecyclerPool.ConcurrentDequePoolBase;
10+
import com.fasterxml.jackson.core.util.RecyclerPool.LockFreePoolBase;
711

812
public final class AvroRecyclerPools
913
{
@@ -26,6 +30,72 @@ public static RecyclerPool<ApacheCodecRecycler> threadLocalPool() {
2630
return ThreadLocalPool.GLOBAL;
2731
}
2832

33+
/**
34+
* Accessor for getting the shared/global {@link NonRecyclingPool} instance
35+
* (due to design only one instance ever needed)
36+
*
37+
* @return Globally shared instance of {@link NonRecyclingPool}.
38+
*/
39+
public static RecyclerPool<ApacheCodecRecycler> nonRecyclingPool() {
40+
return NonRecyclingPool.GLOBAL;
41+
}
42+
43+
/**
44+
* Accessor for getting the shared/global {@link ConcurrentDequePool} instance.
45+
*
46+
* @return Globally shared instance of {@link NonRecyclingPool}.
47+
*/
48+
public static RecyclerPool<ApacheCodecRecycler> sharedConcurrentDequePool() {
49+
return ConcurrentDequePool.GLOBAL;
50+
}
51+
52+
/**
53+
* Accessor for constructing a new, non-shared {@link ConcurrentDequePool} instance.
54+
*
55+
* @return Globally shared instance of {@link NonRecyclingPool}.
56+
*/
57+
public static RecyclerPool<ApacheCodecRecycler> newConcurrentDequePool() {
58+
return ConcurrentDequePool.construct();
59+
}
60+
61+
/**
62+
* Accessor for getting the shared/global {@link LockFreePool} instance.
63+
*
64+
* @return Globally shared instance of {@link LockFreePool}.
65+
*/
66+
public static RecyclerPool<ApacheCodecRecycler> sharedLockFreePool() {
67+
return LockFreePool.GLOBAL;
68+
}
69+
70+
/**
71+
* Accessor for constructing a new, non-shared {@link LockFreePool} instance.
72+
*
73+
* @return Globally shared instance of {@link LockFreePool}.
74+
*/
75+
public static RecyclerPool<ApacheCodecRecycler> newLockFreePool() {
76+
return LockFreePool.construct();
77+
}
78+
79+
/**
80+
* Accessor for getting the shared/global {@link BoundedPool} instance.
81+
*
82+
* @return Globally shared instance of {@link BoundedPool}.
83+
*/
84+
public static RecyclerPool<ApacheCodecRecycler> sharedBoundedPool() {
85+
return BoundedPool.GLOBAL;
86+
}
87+
88+
/**
89+
* Accessor for constructing a new, non-shared {@link BoundedPool} instance.
90+
*
91+
* @param size Maximum number of values to pool
92+
*
93+
* @return Globally shared instance of {@link BoundedPool}.
94+
*/
95+
public static RecyclerPool<ApacheCodecRecycler> newBoundedPool(int size) {
96+
return BoundedPool.construct(size);
97+
}
98+
2999
/*
30100
/**********************************************************************
31101
/* Concrete RecyclerPool implementations for recycling BufferRecyclers
@@ -34,7 +104,7 @@ public static RecyclerPool<ApacheCodecRecycler> threadLocalPool() {
34104

35105
/**
36106
* {@link ThreadLocal}-based {@link RecyclerPool} implementation used for
37-
* recycling {@link BufferRecycler} instances:
107+
* recycling {@link ApacheCodecRecycler} instances:
38108
* see {@link RecyclerPool.ThreadLocalPoolBase} for full explanation
39109
* of functioning.
40110
*/
@@ -66,5 +136,137 @@ public ApacheCodecRecycler acquirePooled() {
66136

67137
protected Object readResolve() { return GLOBAL; }
68138
}
69-
139+
140+
/**
141+
* Dummy {@link RecyclerPool} implementation that does not recycle
142+
* anything but simply creates new instances when asked to acquire items.
143+
*/
144+
public static class NonRecyclingPool
145+
extends RecyclerPool.NonRecyclingPoolBase<ApacheCodecRecycler>
146+
{
147+
private static final long serialVersionUID = 1L;
148+
149+
protected static final NonRecyclingPool GLOBAL = new NonRecyclingPool();
150+
151+
protected NonRecyclingPool() { }
152+
153+
@Override
154+
public ApacheCodecRecycler acquirePooled() {
155+
return new ApacheCodecRecycler();
156+
}
157+
158+
// // // JDK serialization support
159+
160+
protected Object readResolve() { return GLOBAL; }
161+
}
162+
163+
/**
164+
* {@link RecyclerPool} implementation that uses
165+
* {@link ConcurrentLinkedDeque} for recycling instances.
166+
*<p>
167+
* Pool is unbounded: see {@link RecyclerPool} what this means.
168+
*/
169+
public static class ConcurrentDequePool extends ConcurrentDequePoolBase<ApacheCodecRecycler>
170+
{
171+
private static final long serialVersionUID = 1L;
172+
173+
protected static final ConcurrentDequePool GLOBAL = new ConcurrentDequePool(SERIALIZATION_SHARED);
174+
175+
// // // Life-cycle (constructors, factory methods)
176+
177+
protected ConcurrentDequePool(int serialization) {
178+
super(serialization);
179+
}
180+
181+
public static ConcurrentDequePool construct() {
182+
return new ConcurrentDequePool(SERIALIZATION_NON_SHARED);
183+
}
184+
185+
@Override
186+
public ApacheCodecRecycler createPooled() {
187+
return new ApacheCodecRecycler();
188+
}
189+
190+
// // // JDK serialization support
191+
192+
// Make sure to re-link to global/shared or non-shared.
193+
protected Object readResolve() {
194+
return _resolveToShared(GLOBAL).orElseGet(() -> construct());
195+
}
196+
}
197+
198+
/**
199+
* {@link RecyclerPool} implementation that uses
200+
* a lock free linked list for recycling instances.
201+
*<p>
202+
* Pool is unbounded: see {@link RecyclerPool} for
203+
* details on what this means.
204+
*/
205+
public static class LockFreePool extends LockFreePoolBase<ApacheCodecRecycler>
206+
{
207+
private static final long serialVersionUID = 1L;
208+
209+
protected static final LockFreePool GLOBAL = new LockFreePool(SERIALIZATION_SHARED);
210+
211+
// // // Life-cycle (constructors, factory methods)
212+
213+
protected LockFreePool(int serialization) {
214+
super(serialization);
215+
}
216+
217+
public static LockFreePool construct() {
218+
return new LockFreePool(SERIALIZATION_NON_SHARED);
219+
}
220+
221+
@Override
222+
public ApacheCodecRecycler createPooled() {
223+
return new ApacheCodecRecycler();
224+
}
225+
226+
// // // JDK serialization support
227+
228+
// Make sure to re-link to global/shared or non-shared.
229+
protected Object readResolve() {
230+
return _resolveToShared(GLOBAL).orElseGet(() -> construct());
231+
}
232+
}
233+
234+
/**
235+
* {@link RecyclerPool} implementation that uses
236+
* a bounded queue ({@link ArrayBlockingQueue} for recycling instances.
237+
* This is "bounded" pool since it will never hold on to more
238+
* {@link ApacheCodecRecycler} instances than its size configuration:
239+
* the default size is {@link BoundedPoolBase#DEFAULT_CAPACITY}.
240+
*/
241+
public static class BoundedPool extends BoundedPoolBase<ApacheCodecRecycler>
242+
{
243+
private static final long serialVersionUID = 1L;
244+
245+
protected static final BoundedPool GLOBAL = new BoundedPool(SERIALIZATION_SHARED);
246+
247+
// // // Life-cycle (constructors, factory methods)
248+
249+
protected BoundedPool(int capacityAsId) {
250+
super(capacityAsId);
251+
}
252+
253+
public static BoundedPool construct(int capacity) {
254+
if (capacity <= 0) {
255+
throw new IllegalArgumentException("capacity must be > 0, was: "+capacity);
256+
}
257+
return new BoundedPool(capacity);
258+
}
259+
260+
@Override
261+
public ApacheCodecRecycler createPooled() {
262+
return new ApacheCodecRecycler();
263+
}
264+
265+
// // // JDK serialization support
266+
267+
// Make sure to re-link to global/shared or non-shared.
268+
protected Object readResolve() {
269+
return _resolveToShared(GLOBAL).orElseGet(() -> construct(_serialization));
270+
}
271+
}
70272
}

release-notes/VERSION-2.x

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@ Active maintainers:
1616

1717
2.16.0 (not yet released)
1818

19-
#403: Remove Smile-specific buffer-recycling
19+
#400: (avro) Rewrite Avro buffer recycling (`ApacheCodecRecycler.java`) to
20+
use new `RecyclerPool`, allow configuring use of non-ThreadLocal based pools
21+
#403: (smile) Remove Smile-specific buffer-recycling
2022

2123
2.15.3 (12-Oct-2023)
2224

23-
#384: `Smile` decoding issue with `NonBlockingByteArrayParser`, concurrency
25+
#384: (smile) `Smile` decoding issue with `NonBlockingByteArrayParser`, concurrency
2426
(reported by Simon D)
2527

2628
2.15.2 (30-May-2023)

0 commit comments

Comments
 (0)