Skip to content

Commit 5bffe8b

Browse files
authored
First part of fix for #400 (#405)
1 parent 4260f58 commit 5bffe8b

File tree

6 files changed

+175
-36
lines changed

6 files changed

+175
-36
lines changed

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/AvroFactory.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import com.fasterxml.jackson.core.format.InputAccessor;
88
import com.fasterxml.jackson.core.format.MatchStrength;
99
import com.fasterxml.jackson.core.io.IOContext;
10-
10+
import com.fasterxml.jackson.core.util.RecyclerPool;
11+
import com.fasterxml.jackson.dataformat.avro.apacheimpl.ApacheCodecRecycler;
12+
import com.fasterxml.jackson.dataformat.avro.apacheimpl.AvroRecyclerPools;
1113
import com.fasterxml.jackson.dataformat.avro.deser.*;
1214

1315
/**
@@ -40,6 +42,12 @@ public class AvroFactory extends JsonFactory
4042
/**********************************************************
4143
*/
4244

45+
/**
46+
* @since 2.16
47+
*/
48+
protected RecyclerPool<ApacheCodecRecycler> _avroRecyclerPool
49+
= AvroRecyclerPools.defaultPool();
50+
4351
protected int _avroParserFeatures;
4452

4553
protected int _avroGeneratorFeatures;
@@ -463,6 +471,7 @@ protected AvroGenerator _createGenerator(OutputStream out, IOContext ctxt) throw
463471
{
464472
int feats = _avroGeneratorFeatures;
465473
AvroGenerator gen = new AvroGenerator(ctxt, _generatorFeatures, feats,
474+
_avroRecyclerPool.acquireAndLinkPooled(),
466475
_objectCodec, out);
467476
return gen;
468477
}

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/AvroGenerator.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,12 @@ private Feature(boolean defaultState) {
9393
* @since 2.16
9494
*/
9595
protected final static EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
96-
96+
97+
/**
98+
* @since 2.16
99+
*/
100+
protected ApacheCodecRecycler _apacheCodecRecycler;
101+
97102
/**
98103
* @since 2.16
99104
*/
@@ -144,6 +149,7 @@ private Feature(boolean defaultState) {
144149
*/
145150

146151
public AvroGenerator(IOContext ctxt, int jsonFeatures, int avroFeatures,
152+
ApacheCodecRecycler apacheCodecRecycler,
147153
ObjectCodec codec, OutputStream output)
148154
throws IOException
149155
{
@@ -153,8 +159,9 @@ public AvroGenerator(IOContext ctxt, int jsonFeatures, int avroFeatures,
153159
_output = output;
154160
_avroContext = AvroWriteContext.nullContext();
155161

162+
_apacheCodecRecycler = apacheCodecRecycler;
156163
final boolean buffering = isEnabled(Feature.AVRO_BUFFERING);
157-
BinaryEncoder encoderToReuse = ApacheCodecRecycler.acquireEncoder();
164+
BinaryEncoder encoderToReuse = _apacheCodecRecycler.acquireEncoder();
158165
_encoder = buffering
159166
? ENCODER_FACTORY.binaryEncoder(output, encoderToReuse)
160167
: ENCODER_FACTORY.directBinaryEncoder(output, encoderToReuse);
@@ -626,10 +633,15 @@ protected final void _verifyValueWrite(String typeMsg) throws IOException {
626633
@Override
627634
protected void _releaseBuffers() {
628635
// no super implementation to call
629-
BinaryEncoder e = _encoder;
630-
if (e != null) {
631-
_encoder = null;
632-
ApacheCodecRecycler.release(e);
636+
ApacheCodecRecycler recycler = _apacheCodecRecycler;
637+
if (recycler != null) {
638+
_apacheCodecRecycler = null;
639+
BinaryEncoder e = _encoder;
640+
if (e != null) {
641+
_encoder = null;
642+
recycler.release(e);
643+
}
644+
recycler.releaseToPool();
633645
}
634646
}
635647

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/ApacheAvroFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,14 @@ public AvroFactory copy()
4545
@Override
4646
protected AvroParser _createParser(InputStream in, IOContext ctxt) throws IOException {
4747
return new ApacheAvroParserImpl(ctxt, _parserFeatures, _avroParserFeatures,
48+
_avroRecyclerPool.acquireAndLinkPooled(),
4849
_objectCodec, in);
4950
}
5051

5152
@Override
5253
protected AvroParser _createParser(byte[] data, int offset, int len, IOContext ctxt) throws IOException {
5354
return new ApacheAvroParserImpl(ctxt, _parserFeatures, _avroParserFeatures,
55+
_avroRecyclerPool.acquireAndLinkPooled(),
5456
_objectCodec, data, offset, len);
5557
}
5658
}

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/ApacheAvroParserImpl.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.io.Writer;
66

77
import org.apache.avro.io.BinaryDecoder;
8+
import org.apache.avro.io.BinaryEncoder;
89
import org.apache.avro.io.DecoderFactory;
910

1011
import com.fasterxml.jackson.core.*;
@@ -18,11 +19,22 @@
1819
*/
1920
public class ApacheAvroParserImpl extends AvroParserImpl
2021
{
22+
/*
23+
/**********************************************************
24+
/* Configuration
25+
/**********************************************************
26+
*/
27+
2128
/**
2229
* @since 2.16
2330
*/
2431
protected final static DecoderFactory DECODER_FACTORY = DecoderFactory.get();
2532

33+
/**
34+
* @since 2.16
35+
*/
36+
protected ApacheCodecRecycler _apacheCodecRecycler;
37+
2638
/*
2739
/**********************************************************
2840
/* Input source config
@@ -71,6 +83,7 @@ public class ApacheAvroParserImpl extends AvroParserImpl
7183
*/
7284

7385
public ApacheAvroParserImpl(IOContext ctxt, int parserFeatures, int avroFeatures,
86+
ApacheCodecRecycler apacheCodecRecycler,
7487
ObjectCodec codec, InputStream in)
7588
{
7689
super(ctxt, parserFeatures, avroFeatures, codec);
@@ -80,20 +93,23 @@ public ApacheAvroParserImpl(IOContext ctxt, int parserFeatures, int avroFeatures
8093
_inputEnd = 0;
8194
_bufferRecyclable = true;
8295

96+
_apacheCodecRecycler = apacheCodecRecycler;
8397
final boolean buffering = Feature.AVRO_BUFFERING.enabledIn(avroFeatures);
84-
BinaryDecoder decoderToReuse = ApacheCodecRecycler.acquireDecoder();
98+
BinaryDecoder decoderToReuse = apacheCodecRecycler.acquireDecoder();
8599
_decoder = buffering
86100
? DECODER_FACTORY.binaryDecoder(in, decoderToReuse)
87101
: DECODER_FACTORY.directBinaryDecoder(in, decoderToReuse);
88102
}
89103

90104
public ApacheAvroParserImpl(IOContext ctxt, int parserFeatures, int avroFeatures,
105+
ApacheCodecRecycler apacheCodecRecycler,
91106
ObjectCodec codec,
92107
byte[] buffer, int offset, int len)
93108
{
94109
super(ctxt, parserFeatures, avroFeatures, codec);
95110
_inputStream = null;
96-
BinaryDecoder decoderToReuse = ApacheCodecRecycler.acquireDecoder();
111+
_apacheCodecRecycler = apacheCodecRecycler;
112+
BinaryDecoder decoderToReuse = apacheCodecRecycler.acquireDecoder();
97113
_decoder = DECODER_FACTORY.binaryDecoder(buffer, offset, len, decoderToReuse);
98114
}
99115

@@ -107,14 +123,18 @@ protected void _releaseBuffers() throws IOException {
107123
_ioContext.releaseReadIOBuffer(buf);
108124
}
109125
}
110-
BinaryDecoder d = _decoder;
111-
if (d != null) {
112-
_decoder = null;
113-
ApacheCodecRecycler.release(d);
126+
ApacheCodecRecycler recycler = _apacheCodecRecycler;
127+
if (recycler != null) {
128+
_apacheCodecRecycler = null;
129+
BinaryDecoder d = _decoder;
130+
if (d != null) {
131+
_decoder = null;
132+
recycler.release(d);
133+
}
134+
recycler.releaseToPool();
114135
}
115136
}
116137

117-
118138
/*
119139
/**********************************************************
120140
/* Abstract method impls, i/o access
Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package com.fasterxml.jackson.dataformat.avro.apacheimpl;
22

3-
import java.lang.ref.SoftReference;
3+
import java.util.Objects;
44
import java.util.concurrent.atomic.AtomicReference;
55

6+
import com.fasterxml.jackson.core.util.RecyclerPool;
7+
import com.fasterxml.jackson.core.util.RecyclerPool.WithPool;
8+
69
import org.apache.avro.io.*;
710

811
/**
@@ -12,51 +15,74 @@
1215
* @since 2.8.7
1316
*/
1417
public final class ApacheCodecRecycler
18+
implements WithPool<ApacheCodecRecycler>
1519
{
16-
protected final static ThreadLocal<SoftReference<ApacheCodecRecycler>> _recycler
17-
= new ThreadLocal<SoftReference<ApacheCodecRecycler>>();
18-
1920
private final AtomicReference<BinaryDecoder> decoderRef = new AtomicReference<>();
2021
private final AtomicReference<BinaryEncoder> encoderRef = new AtomicReference<>();
2122

22-
private ApacheCodecRecycler() { }
23+
private RecyclerPool<ApacheCodecRecycler> _pool;
24+
25+
ApacheCodecRecycler() { }
2326

2427
/*
2528
/**********************************************************
2629
/* Public API
2730
/**********************************************************
2831
*/
2932

30-
public static BinaryDecoder acquireDecoder() {
31-
return _recycler().decoderRef.getAndSet(null);
33+
public BinaryDecoder acquireDecoder() {
34+
return decoderRef.getAndSet(null);
3235
}
3336

34-
public static BinaryEncoder acquireEncoder() {
35-
return _recycler().encoderRef.getAndSet(null);
37+
public BinaryEncoder acquireEncoder() {
38+
return encoderRef.getAndSet(null);
3639
}
3740

38-
public static void release(BinaryDecoder dec) {
39-
_recycler().decoderRef.set(dec);
41+
public void release(BinaryDecoder dec) {
42+
decoderRef.set(dec);
4043
}
4144

42-
public static void release(BinaryEncoder enc) {
43-
_recycler().encoderRef.set(enc);
45+
public void release(BinaryEncoder enc) {
46+
encoderRef.set(enc);
4447
}
4548

4649
/*
4750
/**********************************************************
48-
/* Internal per-instance methods
51+
/* WithPool implementation
4952
/**********************************************************
5053
*/
54+
55+
/**
56+
* Method called by owner of this recycler instance, to provide reference to
57+
* {@link RecyclerPool} into which instance is to be released (if any)
58+
*
59+
* @since 2.16
60+
*/
61+
@Override
62+
public ApacheCodecRecycler withPool(RecyclerPool<ApacheCodecRecycler> pool) {
63+
if (this._pool != null) {
64+
throw new IllegalStateException("ApacheCodecRecycler already linked to pool: "+pool);
65+
}
66+
// assign to pool to which this BufferRecycler belongs in order to release it
67+
// to the same pool when the work will be completed
68+
_pool = Objects.requireNonNull(pool);
69+
return this;
70+
}
5171

52-
private static ApacheCodecRecycler _recycler() {
53-
SoftReference<ApacheCodecRecycler> ref = _recycler.get();
54-
ApacheCodecRecycler r = (ref == null) ? null : ref.get();
55-
56-
if (r == null) {
57-
r = new ApacheCodecRecycler();
58-
_recycler.set(new SoftReference<>(r));
72+
/**
73+
* Method called when owner of this recycler no longer wishes use it; this should
74+
* return it to pool passed via {@code withPool()} (if any).
75+
*
76+
* @since 2.16
77+
*/
78+
@Override
79+
public void releaseToPool() {
80+
if (_pool != null) {
81+
RecyclerPool<ApacheCodecRecycler> tmpPool = _pool;
82+
// nullify the reference to the pool in order to avoid the risk of releasing
83+
// the same BufferRecycler more than once, thus compromising the pool integrity
84+
_pool = null;
85+
tmpPool.releasePooled(this);
5986
}
60-
return r;
6187
}
6288
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.fasterxml.jackson.dataformat.avro.apacheimpl;
2+
3+
import java.lang.ref.SoftReference;
4+
5+
import com.fasterxml.jackson.core.util.BufferRecycler;
6+
import com.fasterxml.jackson.core.util.RecyclerPool;
7+
8+
public final class AvroRecyclerPools
9+
{
10+
/**
11+
* @return the default {@link RecyclerPool} implementation
12+
* which is the thread local based one:
13+
* basically alias to {@link #threadLocalPool()}).
14+
*/
15+
public static RecyclerPool<ApacheCodecRecycler> defaultPool() {
16+
return threadLocalPool();
17+
}
18+
19+
/**
20+
* Accessor for getting the shared/global {@link ThreadLocalPool} instance
21+
* (due to design only one instance ever needed)
22+
*
23+
* @return Globally shared instance of {@link ThreadLocalPool}
24+
*/
25+
public static RecyclerPool<ApacheCodecRecycler> threadLocalPool() {
26+
return ThreadLocalPool.GLOBAL;
27+
}
28+
29+
/*
30+
/**********************************************************************
31+
/* Concrete RecyclerPool implementations for recycling BufferRecyclers
32+
/**********************************************************************
33+
*/
34+
35+
/**
36+
* {@link ThreadLocal}-based {@link RecyclerPool} implementation used for
37+
* recycling {@link BufferRecycler} instances:
38+
* see {@link RecyclerPool.ThreadLocalPoolBase} for full explanation
39+
* of functioning.
40+
*/
41+
public static class ThreadLocalPool
42+
extends RecyclerPool.ThreadLocalPoolBase<ApacheCodecRecycler>
43+
{
44+
private static final long serialVersionUID = 1L;
45+
46+
protected static final ThreadLocalPool GLOBAL = new ThreadLocalPool();
47+
48+
protected final static ThreadLocal<SoftReference<ApacheCodecRecycler>> _recycler
49+
= new ThreadLocal<SoftReference<ApacheCodecRecycler>>();
50+
51+
private ThreadLocalPool() { }
52+
53+
@Override
54+
public ApacheCodecRecycler acquirePooled() {
55+
SoftReference<ApacheCodecRecycler> ref = _recycler.get();
56+
ApacheCodecRecycler r = (ref == null) ? null : ref.get();
57+
58+
if (r == null) {
59+
r = new ApacheCodecRecycler();
60+
_recycler.set(new SoftReference<>(r));
61+
}
62+
return r;
63+
}
64+
65+
// // // JDK serialization support
66+
67+
protected Object readResolve() { return GLOBAL; }
68+
}
69+
70+
}

0 commit comments

Comments
 (0)