diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ChunkedInputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ChunkedInputStream.java
index c7ba75bdc7..7bf951e49e 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ChunkedInputStream.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ChunkedInputStream.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.io.InputStream;
+import org.apache.hc.core5.http.Chars;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
@@ -37,6 +38,7 @@
import org.apache.hc.core5.http.StreamClosedException;
import org.apache.hc.core5.http.TruncatedChunkException;
import org.apache.hc.core5.http.config.H1Config;
+import org.apache.hc.core5.http.io.EofInputStream;
import org.apache.hc.core5.http.io.SessionInputBuffer;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.CharArrayBuffer;
@@ -57,7 +59,7 @@
* @since 4.0
*
*/
-public class ChunkedInputStream extends InputStream {
+public class ChunkedInputStream extends EofInputStream {
private static final int CHUNK_LEN = 1;
private static final int CHUNK_DATA = 2;
@@ -154,6 +156,7 @@ public int read() throws IOException {
pos++;
if (pos >= chunkSize) {
state = CHUNK_CRLF;
+ consumeEndOfMessageIfBuffered();
}
}
return b;
@@ -190,6 +193,7 @@ public int read (final byte[] b, final int off, final int len) throws IOExceptio
pos += bytesRead;
if (pos >= chunkSize) {
state = CHUNK_CRLF;
+ consumeEndOfMessageIfBuffered();
}
return bytesRead;
}
@@ -324,4 +328,60 @@ public Header[] getFooters() {
return this.footers.clone();
}
+ private void consumeEndOfMessageIfBuffered() throws IOException {
+ if (eof || state != CHUNK_CRLF) {
+ return;
+ }
+
+ // To avoid blocking, peek at the unread buffered bytes.
+ // All content has been read if the unread bytes consist of:
+ // - CR+LF
+ // - A chunk size of 0, followed by CR+LF
+ // - Zero or more footers followed by CR+LF
+ // - An empty line
+ final int bufferedLen = buffer.getBufferedLen();
+ if (bufferedLen < 2 || buffer.peekBuffered(0) != Chars.CR || buffer.peekBuffered(1) != Chars.LF) {
+ return;
+ }
+
+ // check if the next buffered line contains all zeros
+ int cur = 2;
+ boolean foundZeros = false;
+ for (; cur < bufferedLen; cur++) {
+ if (buffer.peekBuffered(cur) == Chars.CR) {
+ break;
+ } else if (buffer.peekBuffered(cur) == '0') {
+ foundZeros = true;
+ } else {
+ return;
+ }
+ }
+ if (!foundZeros) {
+ return;
+ }
+
+ // check if the remaining buffered data contains an empty line,
+ // skipping any footers
+ boolean foundEmptyLine = false;
+ for (; cur + 3 < bufferedLen; cur++) {
+ if (buffer.peekBuffered(cur) == Chars.CR &&
+ buffer.peekBuffered(cur + 1) == Chars.LF &&
+ buffer.peekBuffered(cur + 2) == Chars.CR &&
+ buffer.peekBuffered(cur + 3) == Chars.LF) {
+ foundEmptyLine = true;
+ break;
+ }
+ }
+ if (foundEmptyLine == false) {
+ return;
+ }
+
+ // Read the next chunk header and footers. The operation should not block
+ nextChunk();
+ }
+
+ @Override
+ public boolean atEof() {
+ return eof;
+ }
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ContentLengthInputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ContentLengthInputStream.java
index b2edb15cee..f1f134436a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ContentLengthInputStream.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ContentLengthInputStream.java
@@ -32,6 +32,7 @@
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.StreamClosedException;
+import org.apache.hc.core5.http.io.EofInputStream;
import org.apache.hc.core5.http.io.SessionInputBuffer;
import org.apache.hc.core5.util.Args;
@@ -51,7 +52,7 @@
*
* @since 4.0
*/
-public class ContentLengthInputStream extends InputStream {
+public class ContentLengthInputStream extends EofInputStream {
private static final int BUFFER_SIZE = 2048;
@@ -223,4 +224,9 @@ public long skip(final long n) throws IOException {
}
return count;
}
+
+ @Override
+ public boolean atEof() {
+ return pos == contentLength;
+ }
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/SessionInputBufferImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/SessionInputBufferImpl.java
index 5e8c03d130..212302ecbf 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/SessionInputBufferImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/SessionInputBufferImpl.java
@@ -384,4 +384,14 @@ public HttpTransportMetrics getMetrics() {
return this.metrics;
}
+ @Override
+ public int getBufferedLen() {
+ return bufferlen - bufferpos;
+ }
+
+ @Override
+ public byte peekBuffered(final int offset) {
+ return buffer[this.bufferpos + offset];
+ }
+
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofInputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofInputStream.java
new file mode 100644
index 0000000000..f01b4ab70f
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofInputStream.java
@@ -0,0 +1,40 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+package org.apache.hc.core5.http.io;
+
+import java.io.InputStream;
+
+public abstract class EofInputStream extends InputStream {
+
+ /**
+ * Tries to determine, without blocking, if the entire content of the stream has already been read
+ * @return true if the entire content of the stream has already been read, false if either
+ * the entire content has not been read or a determination cannot be made without blocking
+ */
+ public abstract boolean atEof();
+}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofSensorInputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofSensorInputStream.java
index a00da02509..52e1405273 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofSensorInputStream.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofSensorInputStream.java
@@ -192,7 +192,7 @@ public void close() throws IOException {
private void checkEOF(final int eof) throws IOException {
final InputStream toCheckStream = wrappedStream;
- if ((toCheckStream != null) && (eof < 0)) {
+ if ((toCheckStream != null) && (eof < 0 || atEof(toCheckStream))) {
try {
boolean scws = true; // should close wrapped stream?
if (eofWatcher != null) {
@@ -207,6 +207,10 @@ private void checkEOF(final int eof) throws IOException {
}
}
+ private static boolean atEof(final InputStream toCheckStream) {
+ return (toCheckStream instanceof EofInputStream) && ((EofInputStream)toCheckStream).atEof();
+ }
+
/**
* Detects stream close and notifies the watcher.
* There's not much to detect since this is called by {@link #close close}.
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/SessionInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/SessionInputBuffer.java
index fe68b77170..107af2932a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/io/SessionInputBuffer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/io/SessionInputBuffer.java
@@ -144,4 +144,17 @@ public interface SessionInputBuffer {
*/
HttpTransportMetrics getMetrics();
+ /**
+ * Returns the number of bytes stored in the session buffer
+ * @return the number of bytes in the session buffer
+ */
+ int getBufferedLen();
+
+ /**
+ * Returns the byte stored at the given offset in the session buffer.
+ * The offset must be >= 0 and < getBufferedLen() - 1.
+ * @param offset in the buffer
+ * @return the buffered byte
+ */
+ byte peekBuffered(int offset);
}
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestChunkCoding.java b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestChunkCoding.java
index 04e007fd0c..24b1e937c7 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestChunkCoding.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestChunkCoding.java
@@ -67,8 +67,11 @@ public void testChunkedInputStreamLargeBuffer() throws IOException {
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
+ Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read(buffer));
+ Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read(buffer));
+ Assert.assertTrue(in.atEof());
in.close();
@@ -97,8 +100,12 @@ public void testChunkedInputStreamSmallBuffer() throws IOException {
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
+ Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read(buffer));
+ Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read(buffer));
+ Assert.assertTrue(in.atEof());
+
in.close();
@@ -124,8 +131,12 @@ public void testChunkedInputStreamOneByteRead() throws IOException {
Assert.assertEquals(i, ch);
i++;
}
+ Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read());
+ Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read());
+ Assert.assertTrue(in.atEof());
+
in.close();
}
@@ -507,5 +518,43 @@ public void testHugeChunk() throws IOException {
Assert.assertEquals("01234567", result);
}
+ @Test
+ public void testAtEof() throws IOException {
+ final SessionInputBuffer inbuffer = new SessionInputBufferImpl(128);
+ final String chunkedInput = CHUNKED_INPUT + "\r\n";
+ final ByteArrayInputStream inputStream = new ByteArrayInputStream(chunkedInput.getBytes(StandardCharsets.ISO_8859_1));
+ final ChunkedInputStream in = new ChunkedInputStream(inbuffer, inputStream);
+ final byte[] buffer = new byte[64];
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int len;
+ len = in.read(buffer);
+ Assert.assertEquals(16, len);
+ Assert.assertFalse(in.atEof());
+ out.write(buffer, 0, len);
+
+ len = in.read(buffer);
+ Assert.assertEquals(5, len);
+ Assert.assertTrue(in.atEof());
+ out.write(buffer, 0, len);
+
+ Assert.assertEquals(-1, in.read(buffer));
+ Assert.assertTrue(in.atEof());
+ Assert.assertEquals(-1, in.read(buffer));
+ Assert.assertTrue(in.atEof());
+
+ in.close();
+
+ final String result = new String(out.toByteArray(), StandardCharsets.ISO_8859_1);
+ Assert.assertEquals(result, CHUNKED_RESULT);
+
+ final Header[] footers = in.getFooters();
+ Assert.assertNotNull(footers);
+ Assert.assertEquals(2, footers.length);
+ Assert.assertEquals("Footer1", footers[0].getName());
+ Assert.assertEquals("abcde", footers[0].getValue());
+ Assert.assertEquals("Footer2", footers[1].getName());
+ Assert.assertEquals("fghij", footers[1].getValue());
+ }
+
}
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestContentLengthInputStream.java b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestContentLengthInputStream.java
index f54155ec9a..198dfc320b 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestContentLengthInputStream.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestContentLengthInputStream.java
@@ -46,13 +46,15 @@ public void testBasics() throws IOException {
final String s = "1234567890123456";
final ByteArrayInputStream inputStream = new ByteArrayInputStream(s.getBytes(StandardCharsets.ISO_8859_1));
final SessionInputBuffer inbuffer = new SessionInputBufferImpl(16);
- final InputStream in = new ContentLengthInputStream(inbuffer, inputStream, 10L);
+ final ContentLengthInputStream in = new ContentLengthInputStream(inbuffer, inputStream, 10L);
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final byte[] buffer = new byte[50];
int len = in.read(buffer, 0, 2);
+ Assert.assertFalse(in.atEof());
outputStream.write(buffer, 0, len);
len = in.read(buffer);
+ Assert.assertTrue(in.atEof());
outputStream.write(buffer, 0, len);
final String result = new String(outputStream.toByteArray(), StandardCharsets.ISO_8859_1);
@@ -64,34 +66,44 @@ public void testBasics() throws IOException {
public void testSkip() throws IOException {
final ByteArrayInputStream inputStream1 = new ByteArrayInputStream(new byte[20]);
final SessionInputBuffer inbuffer1 = new SessionInputBufferImpl(16);
- final InputStream in1 = new ContentLengthInputStream(inbuffer1, inputStream1, 10L);
+ final ContentLengthInputStream in1 = new ContentLengthInputStream(inbuffer1, inputStream1, 10L);
Assert.assertEquals(10, in1.skip(10));
+ Assert.assertTrue(in1.atEof());
Assert.assertTrue(in1.read() == -1);
+ Assert.assertTrue(in1.atEof());
in1.close();
final ByteArrayInputStream inputStream2 = new ByteArrayInputStream(new byte[20]);
final SessionInputBuffer inbuffer2 = new SessionInputBufferImpl(16);
- final InputStream in2 = new ContentLengthInputStream(inbuffer2, inputStream2, 10L);
+ final ContentLengthInputStream in2 = new ContentLengthInputStream(inbuffer2, inputStream2, 10L);
in2.read();
+ Assert.assertFalse(in2.atEof());
Assert.assertEquals(9, in2.skip(10));
+ Assert.assertTrue(in2.atEof());
Assert.assertTrue(in2.read() == -1);
+ Assert.assertTrue(in2.atEof());
in2.close();
final ByteArrayInputStream inputStream3 = new ByteArrayInputStream(new byte[20]);
final SessionInputBuffer inbuffer3 = new SessionInputBufferImpl(16);
- final InputStream in3 = new ContentLengthInputStream(inbuffer3, inputStream3, 2L);
+ final ContentLengthInputStream in3 = new ContentLengthInputStream(inbuffer3, inputStream3, 2L);
in3.read();
+ Assert.assertFalse(in3.atEof());
in3.read();
+ Assert.assertTrue(in3.atEof());
Assert.assertTrue(in3.skip(10) <= 0);
Assert.assertTrue(in3.skip(-1) == 0);
Assert.assertTrue(in3.read() == -1);
+ Assert.assertTrue(in3.atEof());
in3.close();
final ByteArrayInputStream inputStream4 = new ByteArrayInputStream(new byte[20]);
final SessionInputBuffer inbuffer4 = new SessionInputBufferImpl(16);
- final InputStream in4 = new ContentLengthInputStream(inbuffer4, inputStream4, 10L);
+ final ContentLengthInputStream in4 = new ContentLengthInputStream(inbuffer4, inputStream4, 10L);
Assert.assertEquals(5,in4.skip(5));
+ Assert.assertFalse(in4.atEof());
Assert.assertEquals(5, in4.read(new byte[20]));
+ Assert.assertTrue(in4.atEof());
in4.close();
}
@@ -139,24 +151,25 @@ public void testTruncatedContent() throws IOException {
final String s = "1234567890123456";
final ByteArrayInputStream inputStream = new ByteArrayInputStream(s.getBytes(StandardCharsets.ISO_8859_1));
final SessionInputBuffer inbuffer = new SessionInputBufferImpl(16);
- final InputStream in = new ContentLengthInputStream(inbuffer, inputStream, 32L);
+ final ContentLengthInputStream in = new ContentLengthInputStream(inbuffer, inputStream, 32L);
final byte[] tmp = new byte[32];
final int byteRead = in.read(tmp);
Assert.assertEquals(16, byteRead);
+ Assert.assertFalse(in.atEof());
try {
in.read(tmp);
- Assert.fail("ConnectionClosedException should have been closed");
+ Assert.fail("ConnectionClosedException should have been thrown");
} catch (final ConnectionClosedException ex) {
}
try {
in.read();
- Assert.fail("ConnectionClosedException should have been closed");
+ Assert.fail("ConnectionClosedException should have been thrown");
} catch (final ConnectionClosedException ex) {
}
try {
in.close();
- Assert.fail("ConnectionClosedException should have been closed");
+ Assert.fail("ConnectionClosedException should have been thrown");
} catch (final ConnectionClosedException ex) {
}
}
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/io/TestEofSensorInputStreamWithEofInputStream.java b/httpcore5/src/test/java/org/apache/hc/core5/http/io/TestEofSensorInputStreamWithEofInputStream.java
new file mode 100644
index 0000000000..490feded4c
--- /dev/null
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/io/TestEofSensorInputStreamWithEofInputStream.java
@@ -0,0 +1,101 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.core5.http.io;
+
+import java.io.InputStream;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestEofSensorInputStreamWithEofInputStream {
+
+ private EofInputStream instream;
+ private EofSensorWatcher eofwatcher;
+ private EofSensorInputStream eofstream;
+
+ @Before
+ public void setup() throws Exception {
+ instream = Mockito.mock(EofInputStream.class);
+ eofwatcher = Mockito.mock(EofSensorWatcher.class);
+ eofstream = new EofSensorInputStream(instream, eofwatcher);
+ }
+
+ @Test
+ public void testReadAtEof() throws Exception {
+ Mockito.when(instream.read()).thenReturn(0);
+ Mockito.when(instream.atEof()).thenReturn(true);
+ Mockito.when(eofwatcher.eofDetected(Mockito.any())).thenReturn(Boolean.TRUE);
+
+ Assert.assertEquals(0, eofstream.read());
+ Assert.assertNull(eofstream.getWrappedStream());
+ Mockito.verify(instream, Mockito.times(1)).close();
+ Mockito.verify(eofwatcher).eofDetected(instream);
+ }
+
+ @Test
+ public void testReadNotAtEof() throws Exception {
+ Mockito.when(instream.read()).thenReturn(0);
+ Mockito.when(instream.atEof()).thenReturn(false);
+ Mockito.when(eofwatcher.eofDetected(Mockito.any())).thenReturn(Boolean.TRUE);
+
+ Assert.assertEquals(0, eofstream.read());
+ Assert.assertNotNull(eofstream.getWrappedStream());
+ Mockito.verify(instream, Mockito.never()).close();
+ Mockito.verify(eofwatcher, Mockito.never()).eofDetected(instream);
+ }
+
+ @Test
+ public void testReadByteArrayAtEof() throws Exception {
+ Mockito.when(instream.read(Mockito.any(), Mockito.anyInt(), Mockito.anyInt()))
+ .thenReturn(1);
+ Mockito.when(instream.atEof()).thenReturn(true);
+ Mockito.when(eofwatcher.eofDetected(Mockito.any())).thenReturn(Boolean.TRUE);
+
+ final byte[] tmp = new byte[1];
+
+ Assert.assertEquals(1, eofstream.read(tmp));
+ Assert.assertNull(eofstream.getWrappedStream());
+ Mockito.verify(instream, Mockito.times(1)).close();
+ Mockito.verify(eofwatcher).eofDetected(instream);
+ }
+
+ @Test
+ public void testReadByteArrayNotAtEof() throws Exception {
+ Mockito.when(instream.read(Mockito.any(), Mockito.anyInt(), Mockito.anyInt()))
+ .thenReturn(1);
+ Mockito.when(instream.atEof()).thenReturn(false);
+
+ final byte[] tmp = new byte[1];
+
+ Assert.assertEquals(1, eofstream.read(tmp));
+ Assert.assertNotNull(eofstream.getWrappedStream());
+ Mockito.verify(instream, Mockito.never()).close();
+ Mockito.verify(eofwatcher, Mockito.never()).eofDetected(instream);
+ }
+}