Skip to content

Commit c735002

Browse files
committed
Add tests and fix for concurrency problems wrt avro encoder/decoder
1 parent e910aec commit c735002

File tree

3 files changed

+133
-12
lines changed

3 files changed

+133
-12
lines changed

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/AvroSchema.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,26 @@ public String getSchemaType() {
5050

5151
public static BinaryDecoder decoder(InputStream in, boolean buffering)
5252
{
53+
/*
5354
SoftReference<BinaryDecoder> ref = decoderRecycler.get();
5455
BinaryDecoder prev = (ref == null) ? null : ref.get();
55-
/* Factory will check if the decoder has a matching type for reuse.
56-
* If not, it will drop the instance being reused and will return
57-
* a new, proper one.
58-
*/
56+
// Factory will check if the decoder has a matching type for reuse.
57+
// If not, it will drop the instance being reused and will return a new, proper one.
5958
BinaryDecoder next = buffering
6059
? DECODER_FACTORY.binaryDecoder(in, prev)
6160
: DECODER_FACTORY.directBinaryDecoder(in, prev);
6261
6362
decoderRecycler.set(new SoftReference<BinaryDecoder>(next));
6463
return next;
64+
*/
65+
return buffering
66+
? DECODER_FACTORY.binaryDecoder(in, null)
67+
: DECODER_FACTORY.directBinaryDecoder(in, null);
6568
}
6669

6770
public static BinaryDecoder decoder(byte[] buffer, int offset, int len)
6871
{
72+
/*
6973
SoftReference<BinaryDecoder> ref = decoderRecycler.get();
7074
BinaryDecoder prev = (ref == null) ? null : ref.get();
7175
@@ -75,25 +79,28 @@ public static BinaryDecoder decoder(byte[] buffer, int offset, int len)
7579
prev = DECODER_FACTORY.binaryDecoder(buffer, offset, len, null);
7680
decoderRecycler.set(new SoftReference<BinaryDecoder>(prev));
7781
return prev;
82+
*/
83+
return DECODER_FACTORY.binaryDecoder(buffer, offset, len, null);
7884
}
79-
80-
85+
8186
public static BinaryEncoder encoder(OutputStream out, boolean buffering)
8287
{
88+
/*
8389
SoftReference<BinaryEncoder> ref = encoderRecycler.get();
8490
BinaryEncoder prev = (ref == null) ? null : ref.get();
85-
/* Factory will check if the encoder has a matching type for reuse.
86-
* If not, it will drop the instance being reused and will return
87-
* a new, properly initialized instance
88-
*/
91+
// Factory will check if the decoder has a matching type for reuse.
92+
// If not, it will drop the instance being reused and will return a new, proper one.
8993
BinaryEncoder next =
9094
buffering
9195
? ENCODER_FACTORY.binaryEncoder(out, prev)
9296
: ENCODER_FACTORY.directBinaryEncoder(out, prev);
9397
9498
encoderRecycler.set(new SoftReference<BinaryEncoder>(next));
95-
9699
return next;
100+
*/
101+
return buffering
102+
? ENCODER_FACTORY.binaryEncoder(out, null)
103+
: ENCODER_FACTORY.directBinaryEncoder(out, null);
97104
}
98105

99106
public AvroStructureReader getReader()
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package com.fasterxml.jackson.dataformat.avro;
2+
3+
import java.io.*;
4+
5+
import com.fasterxml.jackson.databind.MappingIterator;
6+
import com.fasterxml.jackson.databind.SequenceWriter;
7+
8+
/**
9+
* Simple tests for some testable aspects of concurrent usage.
10+
* Specifically tries to ensure that usage of multiple parsers,
11+
* generators, from same thread, does not cause problems with
12+
* reuse of certain components.
13+
*/
14+
public class ConcurrencyTest extends AvroTestBase
15+
{
16+
private final AvroMapper MAPPER = getMapper();
17+
18+
private final AvroSchema EMPL_SCHEMA;
19+
20+
public ConcurrencyTest() throws IOException {
21+
EMPL_SCHEMA = getEmployeeSchema();
22+
}
23+
24+
// Simple test that creates 2 encoders and uses them in interleaved manner.
25+
// This should tease out simplest problems with possible encoder reuse.
26+
public void testMultipleEncoders() throws Exception
27+
{
28+
ByteArrayOutputStream b1 = new ByteArrayOutputStream();
29+
ByteArrayOutputStream b2 = new ByteArrayOutputStream();
30+
SequenceWriter sw1 = MAPPER.writer(EMPL_SCHEMA)
31+
.writeValues(b1);
32+
SequenceWriter sw2 = MAPPER.writer(EMPL_SCHEMA)
33+
.writeValues(b2);
34+
35+
for (int i = 0; i < 200; ++i) {
36+
_writeEmpl(sw1, "foo", i);
37+
_writeEmpl(sw2, "foo", i);
38+
}
39+
sw1.close();
40+
sw2.close();
41+
assertEquals(b1.size(), b2.size());
42+
// value just verified once, but since Avro format stable should remain stable
43+
assertEquals(6926, b1.size());
44+
}
45+
46+
public void testMultipleDecodersBlock() throws Exception {
47+
_testMultipleDecoders(false);
48+
}
49+
50+
public void testMultipleDecodersStreaming() throws Exception {
51+
_testMultipleDecoders(true);
52+
}
53+
54+
private void _testMultipleDecoders(boolean useStream) throws Exception
55+
{
56+
final int ROUNDS = 40;
57+
// Here let's do encoding linearly, to remove coupling with other test(s)
58+
ByteArrayOutputStream b = new ByteArrayOutputStream();
59+
SequenceWriter sw = MAPPER.writer(EMPL_SCHEMA)
60+
.writeValues(b);
61+
for (int i = 0; i < ROUNDS; ++i) {
62+
_writeEmpl(sw, "a", i);
63+
}
64+
sw.close();
65+
final byte[] b1 = b.toByteArray();
66+
67+
b = new ByteArrayOutputStream();
68+
sw = MAPPER.writer(EMPL_SCHEMA)
69+
.writeValues(b);
70+
for (int i = 0; i < ROUNDS; ++i) {
71+
_writeEmpl(sw, "b", i);
72+
}
73+
sw.close();
74+
final byte[] b2 = b.toByteArray();
75+
76+
MappingIterator<Employee> it1, it2;
77+
78+
if (useStream) {
79+
it1 = MAPPER.readerFor(Employee.class).with(EMPL_SCHEMA)
80+
.readValues(b1);
81+
it2 = MAPPER.readerFor(Employee.class).with(EMPL_SCHEMA)
82+
.readValues(b2);
83+
} else {
84+
it1 = MAPPER.readerFor(Employee.class).with(EMPL_SCHEMA)
85+
.readValues(new ByteArrayInputStream(b1));
86+
it2 = MAPPER.readerFor(Employee.class).with(EMPL_SCHEMA)
87+
.readValues(new ByteArrayInputStream(b2));
88+
}
89+
90+
for (int i = 0; i < 40; ++i) {
91+
assertTrue(it1.hasNextValue());
92+
assertTrue(it2.hasNextValue());
93+
Employee e1 = it1.nextValue();
94+
Employee e2 = it2.nextValue();
95+
96+
assertEquals("Empl"+i+"a", e1.name);
97+
assertEquals("Empl"+i+"b", e2.name);
98+
assertEquals(10+i, e1.age);
99+
assertEquals(10+i, e2.age);
100+
}
101+
assertFalse(it1.hasNextValue());
102+
assertFalse(it2.hasNextValue());
103+
it1.close();
104+
it2.close();
105+
}
106+
107+
private void _writeEmpl(SequenceWriter sw, String type, int index) throws IOException {
108+
sw.write(_empl(type, index));
109+
}
110+
111+
private Employee _empl(String type, int index) {
112+
return new Employee("Empl"+index+type, 10+index, new String[] { "empl"+index+"@company.com" }, null);
113+
}
114+
}

avro/src/test/java/com/fasterxml/jackson/dataformat/avro/RootSequenceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void testReadWriteEmployeeSequence() throws Exception
7979
Employee peon2 = new Employee("Worker#2", 42, new String[] { "worker2@company.com" }, boss);
8080

8181
// First: write a sequence of 3 root-level Employee Objects
82-
82+
8383
SequenceWriter sw = MAPPER.writerFor(Employee.class)
8484
.with(getEmployeeSchema())
8585
.writeValues(b);

0 commit comments

Comments
 (0)