Skip to content

Commit 4468fd6

Browse files
committed
Stream - Add ReaderInputStream
1 parent b8866fe commit 4468fd6

File tree

4 files changed

+305
-46
lines changed

4 files changed

+305
-46
lines changed

jsonb/src/main/java/io/avaje/jsonb/stream/JsonStream.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,8 @@ public JsonReader reader(byte[] json) {
5050

5151
@Override
5252
public JsonReader reader(Reader reader) {
53-
return null;
54-
// try {
55-
// return new JacksonReader(jsonFactory.createParser(reader), failOnUnknown);
56-
// } catch (IOException e) {
57-
// throw new JsonIoException(e);
58-
// }
53+
// TODO: Could recycle encoder and buffer
54+
return reader(new ReaderInputStream(reader, StandardCharsets.UTF_8));
5955
}
6056

6157
@Override
@@ -70,6 +66,7 @@ public JsonReader reader(InputStream inputStream) {
7066

7167
@Override
7268
public JsonWriter writer(Writer writer) {
69+
// TODO: Could recycle buffer used
7370
return writer(new WriterOutputStream(writer, StandardCharsets.UTF_8));
7471
}
7572

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.avaje.jsonb.stream;
18+
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.io.Reader;
22+
import java.nio.ByteBuffer;
23+
import java.nio.CharBuffer;
24+
import java.nio.charset.Charset;
25+
import java.nio.charset.CharsetEncoder;
26+
import java.nio.charset.CoderResult;
27+
import java.nio.charset.CodingErrorAction;
28+
import java.util.Objects;
29+
30+
/**
31+
* {@link InputStream} implementation that reads a character stream from a {@link Reader} and transforms it to a byte
32+
* stream using a specified charset encoding. The stream is transformed using a {@link CharsetEncoder} object,
33+
* guaranteeing that all charset encodings supported by the JRE are handled correctly. In particular for charsets such
34+
* as UTF-16, the implementation ensures that one and only one byte order marker is produced.
35+
* <p>
36+
* Since in general it is not possible to predict the number of characters to be read from the {@link Reader} to satisfy
37+
* a read request on the {@link ReaderInputStream}, all reads from the {@link Reader} are buffered. There is therefore
38+
* no well defined correlation between the current position of the {@link Reader} and that of the
39+
* {@link ReaderInputStream}. This also implies that in general there is no need to wrap the underlying {@link Reader}
40+
* in a {@link java.io.BufferedReader}.
41+
* </p>
42+
* <p>
43+
* {@link ReaderInputStream} implements the inverse transformation of {@link java.io.InputStreamReader}; in the
44+
* following example, reading from {@code in2} would return the same byte sequence as reading from {@code in} (provided
45+
* that the initial byte sequence is legal with respect to the charset encoding):
46+
* </p>
47+
*
48+
* <pre>
49+
* InputStream inputStream = ...
50+
* Charset cs = ...
51+
* InputStreamReader reader = new InputStreamReader(inputStream, cs);
52+
* ReaderInputStream in2 = new ReaderInputStream(reader, cs);
53+
* </pre>
54+
* <p>
55+
* {@link ReaderInputStream} implements the same transformation as {@link java.io.OutputStreamWriter}, except that the
56+
* control flow is reversed: both classes transform a character stream into a byte stream, but
57+
* {@link java.io.OutputStreamWriter} pushes data to the underlying stream, while {@link ReaderInputStream} pulls it
58+
* from the underlying stream.
59+
* </p>
60+
* <p>
61+
* Note that while there are use cases where there is no alternative to using this class, very often the need to use
62+
* this class is an indication of a flaw in the design of the code. This class is typically used in situations where an
63+
* existing API only accepts an {@link InputStream}, but where the most natural way to produce the data is as a
64+
* character stream, i.e. by providing a {@link Reader} instance.
65+
* </p>
66+
* <p>
67+
* The {@link #available()} method of this class always returns 0. The methods {@link #mark(int)} and {@link #reset()}
68+
* are not supported.
69+
* </p>
70+
* <p>
71+
* Instances of {@link ReaderInputStream} are not thread safe.
72+
* </p>
73+
*/
74+
final class ReaderInputStream extends InputStream {
75+
76+
private static final int EOF = -1;
77+
private static final int DEFAULT_BUFFER_SIZE = 1024;
78+
79+
static int checkMinBufferSize(final CharsetEncoder charsetEncoder, final int bufferSize) {
80+
final float minRequired = minBufferSize(charsetEncoder);
81+
if (bufferSize < minRequired) {
82+
throw new IllegalArgumentException(
83+
String.format("Buffer size %,d must be at least %s for a CharsetEncoder %s.", bufferSize, minRequired, charsetEncoder.charset().displayName()));
84+
}
85+
return bufferSize;
86+
}
87+
88+
static float minBufferSize(final CharsetEncoder charsetEncoder) {
89+
return charsetEncoder.maxBytesPerChar() * 2;
90+
}
91+
92+
private final Reader reader;
93+
94+
private final CharsetEncoder charsetEncoder;
95+
96+
/**
97+
* CharBuffer used as input for the decoder. It should be reasonably large as we read data from the underlying Reader
98+
* into this buffer.
99+
*/
100+
private final CharBuffer encoderIn;
101+
102+
/**
103+
* ByteBuffer used as output for the decoder. This buffer can be small as it is only used to transfer data from the
104+
* decoder to the buffer provided by the caller.
105+
*/
106+
private final ByteBuffer encoderOut;
107+
108+
private CoderResult lastCoderResult;
109+
110+
private boolean endOfInput;
111+
112+
/**
113+
* Constructs a new {@link ReaderInputStream} with a default input buffer size of {@value #DEFAULT_BUFFER_SIZE}
114+
* characters.
115+
*
116+
* <p>
117+
* The encoder created for the specified charset will use {@link CodingErrorAction#REPLACE} for malformed input
118+
* and unmappable characters.
119+
* </p>
120+
*
121+
* @param reader the target {@link Reader}
122+
* @param charset the charset encoding
123+
*/
124+
public ReaderInputStream(final Reader reader, final Charset charset) {
125+
this(reader, charset, DEFAULT_BUFFER_SIZE);
126+
}
127+
128+
/**
129+
* Constructs a new {@link ReaderInputStream}.
130+
*
131+
* @param reader the target {@link Reader}.
132+
* @param charset the charset encoding.
133+
* @param bufferSize the size of the input buffer in characters.
134+
*/
135+
public ReaderInputStream(final Reader reader, final Charset charset, final int bufferSize) {
136+
this(reader, charset.newEncoder(), bufferSize);
137+
}
138+
139+
/**
140+
* Constructs a new {@link ReaderInputStream}.
141+
*
142+
* <p>
143+
* This constructor does not call {@link CharsetEncoder#reset() reset} on the provided encoder. The caller
144+
* of this constructor should do this when providing an encoder which had already been in use.
145+
* </p>
146+
*
147+
* @param reader the target {@link Reader}
148+
* @param charsetEncoder the charset encoder
149+
* @since 2.1
150+
*/
151+
public ReaderInputStream(final Reader reader, final CharsetEncoder charsetEncoder) {
152+
this(reader, charsetEncoder, DEFAULT_BUFFER_SIZE);
153+
}
154+
155+
/**
156+
* Constructs a new {@link ReaderInputStream}.
157+
*
158+
* <p>
159+
* This constructor does not call {@link CharsetEncoder#reset() reset} on the provided encoder. The caller
160+
* of this constructor should do this when providing an encoder which had already been in use.
161+
* </p>
162+
*
163+
* @param reader the target {@link Reader}
164+
* @param charsetEncoder the charset encoder, null defauls to the default Charset encoder.
165+
* @param bufferSize the size of the input buffer in number of characters
166+
* @since 2.1
167+
*/
168+
public ReaderInputStream(final Reader reader, final CharsetEncoder charsetEncoder, final int bufferSize) {
169+
this.reader = reader;
170+
this.charsetEncoder = charsetEncoder;
171+
this.encoderIn = CharBuffer.allocate(checkMinBufferSize(this.charsetEncoder, bufferSize));
172+
this.encoderIn.flip();
173+
this.encoderOut = ByteBuffer.allocate(128);
174+
this.encoderOut.flip();
175+
}
176+
177+
/**
178+
* Close the stream. This method will cause the underlying {@link Reader} to be closed.
179+
*
180+
* @throws IOException if an I/O error occurs.
181+
*/
182+
@Override
183+
public void close() throws IOException {
184+
reader.close();
185+
}
186+
187+
/**
188+
* Fills the internal char buffer from the reader.
189+
*
190+
* @throws IOException If an I/O error occurs
191+
*/
192+
private void fillBuffer() throws IOException {
193+
if (!endOfInput && (lastCoderResult == null || lastCoderResult.isUnderflow())) {
194+
encoderIn.compact();
195+
final int position = encoderIn.position();
196+
// We don't use Reader#read(CharBuffer) here because it is more efficient
197+
// to write directly to the underlying char array (the default implementation
198+
// copies data to a temporary char array).
199+
final int c = reader.read(encoderIn.array(), position, encoderIn.remaining());
200+
if (c == EOF) {
201+
endOfInput = true;
202+
} else {
203+
encoderIn.position(position + c);
204+
}
205+
encoderIn.flip();
206+
}
207+
encoderOut.compact();
208+
lastCoderResult = charsetEncoder.encode(encoderIn, encoderOut, endOfInput);
209+
if (endOfInput) {
210+
lastCoderResult = charsetEncoder.flush(encoderOut);
211+
}
212+
if (lastCoderResult.isError()) {
213+
lastCoderResult.throwException();
214+
}
215+
encoderOut.flip();
216+
}
217+
218+
/**
219+
* Read a single byte.
220+
*
221+
* @return either the byte read or {@code -1} if the end of the stream has been reached
222+
* @throws IOException if an I/O error occurs.
223+
*/
224+
@Override
225+
public int read() throws IOException {
226+
for (;;) {
227+
if (encoderOut.hasRemaining()) {
228+
return encoderOut.get() & 0xFF;
229+
}
230+
fillBuffer();
231+
if (endOfInput && !encoderOut.hasRemaining()) {
232+
return EOF;
233+
}
234+
}
235+
}
236+
237+
/**
238+
* Read the specified number of bytes into an array.
239+
*
240+
* @param b the byte array to read into
241+
* @return the number of bytes read or {@code -1} if the end of the stream has been reached
242+
* @throws IOException if an I/O error occurs.
243+
*/
244+
@Override
245+
public int read(final byte[] b) throws IOException {
246+
return read(b, 0, b.length);
247+
}
248+
249+
/**
250+
* Read the specified number of bytes into an array.
251+
*
252+
* @param array the byte array to read into
253+
* @param off the offset to start reading bytes into
254+
* @param len the number of bytes to read
255+
* @return the number of bytes read or {@code -1} if the end of the stream has been reached
256+
* @throws IOException if an I/O error occurs.
257+
*/
258+
@Override
259+
public int read(final byte[] array, int off, int len) throws IOException {
260+
Objects.requireNonNull(array, "array");
261+
if (len < 0 || off < 0 || (off + len) > array.length) {
262+
throw new IndexOutOfBoundsException("Array size=" + array.length + ", offset=" + off + ", length=" + len);
263+
}
264+
int read = 0;
265+
if (len == 0) {
266+
return 0; // Always return 0 if len == 0
267+
}
268+
while (len > 0) {
269+
if (encoderOut.hasRemaining()) { // Data from the last read not fully copied
270+
final int c = Math.min(encoderOut.remaining(), len);
271+
encoderOut.get(array, off, c);
272+
off += c;
273+
len -= c;
274+
read += c;
275+
} else if (endOfInput) { // Already reach EOF in the last read
276+
break;
277+
} else { // Read again
278+
fillBuffer();
279+
}
280+
}
281+
return read == 0 && endOfInput ? EOF : read;
282+
}
283+
}
284+

jsonb/src/main/java/io/avaje/jsonb/stream/WriterOutputStream.java

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,7 @@ public WriterOutputStream(final Writer writer, final Charset charset) {
153153
public WriterOutputStream(final Writer writer, final Charset charset, final int bufferSize,
154154
final boolean writeImmediately) {
155155
this(writer,
156-
charset.newDecoder()
157-
.onMalformedInput(CodingErrorAction.REPLACE)
158-
.onUnmappableCharacter(CodingErrorAction.REPLACE)
159-
.replaceWith("?"),
156+
charset.newDecoder(),
160157
bufferSize,
161158
writeImmediately);
162159
}
@@ -196,35 +193,6 @@ public WriterOutputStream(final Writer writer, final CharsetDecoder decoder, fin
196193
decoderOut = CharBuffer.allocate(bufferSize);
197194
}
198195

199-
/**
200-
* Constructs a new {@link WriterOutputStream} with a default output buffer size of {@value #BUFFER_SIZE}
201-
* characters. The output buffer will only be flushed when it overflows or when {@link #flush()} or {@link #close()}
202-
* is called.
203-
*
204-
* @param writer the target {@link Writer}
205-
* @param charsetName the name of the charset encoding
206-
*/
207-
public WriterOutputStream(final Writer writer, final String charsetName) {
208-
this(writer, charsetName, BUFFER_SIZE, false);
209-
}
210-
211-
/**
212-
* Constructs a new {@link WriterOutputStream}.
213-
*
214-
* @param writer the target {@link Writer}
215-
* @param charsetName the name of the charset encoding
216-
* @param bufferSize the size of the output buffer in number of characters
217-
* @param writeImmediately If {@code true} the output buffer will be flushed after each
218-
* write operation, i.e. all available data will be written to the
219-
* underlying {@link Writer} immediately. If {@code false}, the
220-
* output buffer will only be flushed when it overflows or when
221-
* {@link #flush()} or {@link #close()} is called.
222-
*/
223-
public WriterOutputStream(final Writer writer, final String charsetName, final int bufferSize,
224-
final boolean writeImmediately) {
225-
this(writer, Charset.forName(charsetName), bufferSize, writeImmediately);
226-
}
227-
228196
/**
229197
* Close the stream. Any remaining content accumulated in the output buffer
230198
* will be written to the underlying {@link Writer}. After that

0 commit comments

Comments
 (0)