diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ChunkedInputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ChunkedInputStream.java index c7ba75bdc7..7bf951e49e 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ChunkedInputStream.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ChunkedInputStream.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.InputStream; +import org.apache.hc.core5.http.Chars; import org.apache.hc.core5.http.ConnectionClosedException; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpException; @@ -37,6 +38,7 @@ import org.apache.hc.core5.http.StreamClosedException; import org.apache.hc.core5.http.TruncatedChunkException; import org.apache.hc.core5.http.config.H1Config; +import org.apache.hc.core5.http.io.EofInputStream; import org.apache.hc.core5.http.io.SessionInputBuffer; import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.CharArrayBuffer; @@ -57,7 +59,7 @@ * @since 4.0 * */ -public class ChunkedInputStream extends InputStream { +public class ChunkedInputStream extends EofInputStream { private static final int CHUNK_LEN = 1; private static final int CHUNK_DATA = 2; @@ -154,6 +156,7 @@ public int read() throws IOException { pos++; if (pos >= chunkSize) { state = CHUNK_CRLF; + consumeEndOfMessageIfBuffered(); } } return b; @@ -190,6 +193,7 @@ public int read (final byte[] b, final int off, final int len) throws IOExceptio pos += bytesRead; if (pos >= chunkSize) { state = CHUNK_CRLF; + consumeEndOfMessageIfBuffered(); } return bytesRead; } @@ -324,4 +328,60 @@ public Header[] getFooters() { return this.footers.clone(); } + private void consumeEndOfMessageIfBuffered() throws IOException { + if (eof || state != CHUNK_CRLF) { + return; + } + + // To avoid blocking, peek at the unread buffered bytes. + // All content has been read if the unread bytes consist of: + // - CR+LF + // - A chunk size of 0, followed by CR+LF + // - Zero or more footers followed by CR+LF + // - An empty line + final int bufferedLen = buffer.getBufferedLen(); + if (bufferedLen < 2 || buffer.peekBuffered(0) != Chars.CR || buffer.peekBuffered(1) != Chars.LF) { + return; + } + + // check if the next buffered line contains all zeros + int cur = 2; + boolean foundZeros = false; + for (; cur < bufferedLen; cur++) { + if (buffer.peekBuffered(cur) == Chars.CR) { + break; + } else if (buffer.peekBuffered(cur) == '0') { + foundZeros = true; + } else { + return; + } + } + if (!foundZeros) { + return; + } + + // check if the remaining buffered data contains an empty line, + // skipping any footers + boolean foundEmptyLine = false; + for (; cur + 3 < bufferedLen; cur++) { + if (buffer.peekBuffered(cur) == Chars.CR && + buffer.peekBuffered(cur + 1) == Chars.LF && + buffer.peekBuffered(cur + 2) == Chars.CR && + buffer.peekBuffered(cur + 3) == Chars.LF) { + foundEmptyLine = true; + break; + } + } + if (foundEmptyLine == false) { + return; + } + + // Read the next chunk header and footers. The operation should not block + nextChunk(); + } + + @Override + public boolean atEof() { + return eof; + } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ContentLengthInputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ContentLengthInputStream.java index b2edb15cee..f1f134436a 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ContentLengthInputStream.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ContentLengthInputStream.java @@ -32,6 +32,7 @@ import org.apache.hc.core5.http.ConnectionClosedException; import org.apache.hc.core5.http.StreamClosedException; +import org.apache.hc.core5.http.io.EofInputStream; import org.apache.hc.core5.http.io.SessionInputBuffer; import org.apache.hc.core5.util.Args; @@ -51,7 +52,7 @@ * * @since 4.0 */ -public class ContentLengthInputStream extends InputStream { +public class ContentLengthInputStream extends EofInputStream { private static final int BUFFER_SIZE = 2048; @@ -223,4 +224,9 @@ public long skip(final long n) throws IOException { } return count; } + + @Override + public boolean atEof() { + return pos == contentLength; + } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/SessionInputBufferImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/SessionInputBufferImpl.java index 5e8c03d130..212302ecbf 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/SessionInputBufferImpl.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/SessionInputBufferImpl.java @@ -384,4 +384,14 @@ public HttpTransportMetrics getMetrics() { return this.metrics; } + @Override + public int getBufferedLen() { + return bufferlen - bufferpos; + } + + @Override + public byte peekBuffered(final int offset) { + return buffer[this.bufferpos + offset]; + } + } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofInputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofInputStream.java new file mode 100644 index 0000000000..f01b4ab70f --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofInputStream.java @@ -0,0 +1,40 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http.io; + +import java.io.InputStream; + +public abstract class EofInputStream extends InputStream { + + /** + * Tries to determine, without blocking, if the entire content of the stream has already been read + * @return true if the entire content of the stream has already been read, false if either + * the entire content has not been read or a determination cannot be made without blocking + */ + public abstract boolean atEof(); +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofSensorInputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofSensorInputStream.java index a00da02509..52e1405273 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofSensorInputStream.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/io/EofSensorInputStream.java @@ -192,7 +192,7 @@ public void close() throws IOException { private void checkEOF(final int eof) throws IOException { final InputStream toCheckStream = wrappedStream; - if ((toCheckStream != null) && (eof < 0)) { + if ((toCheckStream != null) && (eof < 0 || atEof(toCheckStream))) { try { boolean scws = true; // should close wrapped stream? if (eofWatcher != null) { @@ -207,6 +207,10 @@ private void checkEOF(final int eof) throws IOException { } } + private static boolean atEof(final InputStream toCheckStream) { + return (toCheckStream instanceof EofInputStream) && ((EofInputStream)toCheckStream).atEof(); + } + /** * Detects stream close and notifies the watcher. * There's not much to detect since this is called by {@link #close close}. diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/SessionInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/SessionInputBuffer.java index fe68b77170..107af2932a 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/io/SessionInputBuffer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/io/SessionInputBuffer.java @@ -144,4 +144,17 @@ public interface SessionInputBuffer { */ HttpTransportMetrics getMetrics(); + /** + * Returns the number of bytes stored in the session buffer + * @return the number of bytes in the session buffer + */ + int getBufferedLen(); + + /** + * Returns the byte stored at the given offset in the session buffer. + * The offset must be >= 0 and < getBufferedLen() - 1. + * @param offset in the buffer + * @return the buffered byte + */ + byte peekBuffered(int offset); } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestChunkCoding.java b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestChunkCoding.java index 04e007fd0c..24b1e937c7 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestChunkCoding.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestChunkCoding.java @@ -67,8 +67,11 @@ public void testChunkedInputStreamLargeBuffer() throws IOException { while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } + Assert.assertTrue(in.atEof()); Assert.assertEquals(-1, in.read(buffer)); + Assert.assertTrue(in.atEof()); Assert.assertEquals(-1, in.read(buffer)); + Assert.assertTrue(in.atEof()); in.close(); @@ -97,8 +100,12 @@ public void testChunkedInputStreamSmallBuffer() throws IOException { while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } + Assert.assertTrue(in.atEof()); Assert.assertEquals(-1, in.read(buffer)); + Assert.assertTrue(in.atEof()); Assert.assertEquals(-1, in.read(buffer)); + Assert.assertTrue(in.atEof()); + in.close(); @@ -124,8 +131,12 @@ public void testChunkedInputStreamOneByteRead() throws IOException { Assert.assertEquals(i, ch); i++; } + Assert.assertTrue(in.atEof()); Assert.assertEquals(-1, in.read()); + Assert.assertTrue(in.atEof()); Assert.assertEquals(-1, in.read()); + Assert.assertTrue(in.atEof()); + in.close(); } @@ -507,5 +518,43 @@ public void testHugeChunk() throws IOException { Assert.assertEquals("01234567", result); } + @Test + public void testAtEof() throws IOException { + final SessionInputBuffer inbuffer = new SessionInputBufferImpl(128); + final String chunkedInput = CHUNKED_INPUT + "\r\n"; + final ByteArrayInputStream inputStream = new ByteArrayInputStream(chunkedInput.getBytes(StandardCharsets.ISO_8859_1)); + final ChunkedInputStream in = new ChunkedInputStream(inbuffer, inputStream); + final byte[] buffer = new byte[64]; + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + int len; + len = in.read(buffer); + Assert.assertEquals(16, len); + Assert.assertFalse(in.atEof()); + out.write(buffer, 0, len); + + len = in.read(buffer); + Assert.assertEquals(5, len); + Assert.assertTrue(in.atEof()); + out.write(buffer, 0, len); + + Assert.assertEquals(-1, in.read(buffer)); + Assert.assertTrue(in.atEof()); + Assert.assertEquals(-1, in.read(buffer)); + Assert.assertTrue(in.atEof()); + + in.close(); + + final String result = new String(out.toByteArray(), StandardCharsets.ISO_8859_1); + Assert.assertEquals(result, CHUNKED_RESULT); + + final Header[] footers = in.getFooters(); + Assert.assertNotNull(footers); + Assert.assertEquals(2, footers.length); + Assert.assertEquals("Footer1", footers[0].getName()); + Assert.assertEquals("abcde", footers[0].getValue()); + Assert.assertEquals("Footer2", footers[1].getName()); + Assert.assertEquals("fghij", footers[1].getValue()); + } + } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestContentLengthInputStream.java b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestContentLengthInputStream.java index f54155ec9a..198dfc320b 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestContentLengthInputStream.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestContentLengthInputStream.java @@ -46,13 +46,15 @@ public void testBasics() throws IOException { final String s = "1234567890123456"; final ByteArrayInputStream inputStream = new ByteArrayInputStream(s.getBytes(StandardCharsets.ISO_8859_1)); final SessionInputBuffer inbuffer = new SessionInputBufferImpl(16); - final InputStream in = new ContentLengthInputStream(inbuffer, inputStream, 10L); + final ContentLengthInputStream in = new ContentLengthInputStream(inbuffer, inputStream, 10L); final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); final byte[] buffer = new byte[50]; int len = in.read(buffer, 0, 2); + Assert.assertFalse(in.atEof()); outputStream.write(buffer, 0, len); len = in.read(buffer); + Assert.assertTrue(in.atEof()); outputStream.write(buffer, 0, len); final String result = new String(outputStream.toByteArray(), StandardCharsets.ISO_8859_1); @@ -64,34 +66,44 @@ public void testBasics() throws IOException { public void testSkip() throws IOException { final ByteArrayInputStream inputStream1 = new ByteArrayInputStream(new byte[20]); final SessionInputBuffer inbuffer1 = new SessionInputBufferImpl(16); - final InputStream in1 = new ContentLengthInputStream(inbuffer1, inputStream1, 10L); + final ContentLengthInputStream in1 = new ContentLengthInputStream(inbuffer1, inputStream1, 10L); Assert.assertEquals(10, in1.skip(10)); + Assert.assertTrue(in1.atEof()); Assert.assertTrue(in1.read() == -1); + Assert.assertTrue(in1.atEof()); in1.close(); final ByteArrayInputStream inputStream2 = new ByteArrayInputStream(new byte[20]); final SessionInputBuffer inbuffer2 = new SessionInputBufferImpl(16); - final InputStream in2 = new ContentLengthInputStream(inbuffer2, inputStream2, 10L); + final ContentLengthInputStream in2 = new ContentLengthInputStream(inbuffer2, inputStream2, 10L); in2.read(); + Assert.assertFalse(in2.atEof()); Assert.assertEquals(9, in2.skip(10)); + Assert.assertTrue(in2.atEof()); Assert.assertTrue(in2.read() == -1); + Assert.assertTrue(in2.atEof()); in2.close(); final ByteArrayInputStream inputStream3 = new ByteArrayInputStream(new byte[20]); final SessionInputBuffer inbuffer3 = new SessionInputBufferImpl(16); - final InputStream in3 = new ContentLengthInputStream(inbuffer3, inputStream3, 2L); + final ContentLengthInputStream in3 = new ContentLengthInputStream(inbuffer3, inputStream3, 2L); in3.read(); + Assert.assertFalse(in3.atEof()); in3.read(); + Assert.assertTrue(in3.atEof()); Assert.assertTrue(in3.skip(10) <= 0); Assert.assertTrue(in3.skip(-1) == 0); Assert.assertTrue(in3.read() == -1); + Assert.assertTrue(in3.atEof()); in3.close(); final ByteArrayInputStream inputStream4 = new ByteArrayInputStream(new byte[20]); final SessionInputBuffer inbuffer4 = new SessionInputBufferImpl(16); - final InputStream in4 = new ContentLengthInputStream(inbuffer4, inputStream4, 10L); + final ContentLengthInputStream in4 = new ContentLengthInputStream(inbuffer4, inputStream4, 10L); Assert.assertEquals(5,in4.skip(5)); + Assert.assertFalse(in4.atEof()); Assert.assertEquals(5, in4.read(new byte[20])); + Assert.assertTrue(in4.atEof()); in4.close(); } @@ -139,24 +151,25 @@ public void testTruncatedContent() throws IOException { final String s = "1234567890123456"; final ByteArrayInputStream inputStream = new ByteArrayInputStream(s.getBytes(StandardCharsets.ISO_8859_1)); final SessionInputBuffer inbuffer = new SessionInputBufferImpl(16); - final InputStream in = new ContentLengthInputStream(inbuffer, inputStream, 32L); + final ContentLengthInputStream in = new ContentLengthInputStream(inbuffer, inputStream, 32L); final byte[] tmp = new byte[32]; final int byteRead = in.read(tmp); Assert.assertEquals(16, byteRead); + Assert.assertFalse(in.atEof()); try { in.read(tmp); - Assert.fail("ConnectionClosedException should have been closed"); + Assert.fail("ConnectionClosedException should have been thrown"); } catch (final ConnectionClosedException ex) { } try { in.read(); - Assert.fail("ConnectionClosedException should have been closed"); + Assert.fail("ConnectionClosedException should have been thrown"); } catch (final ConnectionClosedException ex) { } try { in.close(); - Assert.fail("ConnectionClosedException should have been closed"); + Assert.fail("ConnectionClosedException should have been thrown"); } catch (final ConnectionClosedException ex) { } } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/io/TestEofSensorInputStreamWithEofInputStream.java b/httpcore5/src/test/java/org/apache/hc/core5/http/io/TestEofSensorInputStreamWithEofInputStream.java new file mode 100644 index 0000000000..490feded4c --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/http/io/TestEofSensorInputStreamWithEofInputStream.java @@ -0,0 +1,101 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http.io; + +import java.io.InputStream; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestEofSensorInputStreamWithEofInputStream { + + private EofInputStream instream; + private EofSensorWatcher eofwatcher; + private EofSensorInputStream eofstream; + + @Before + public void setup() throws Exception { + instream = Mockito.mock(EofInputStream.class); + eofwatcher = Mockito.mock(EofSensorWatcher.class); + eofstream = new EofSensorInputStream(instream, eofwatcher); + } + + @Test + public void testReadAtEof() throws Exception { + Mockito.when(instream.read()).thenReturn(0); + Mockito.when(instream.atEof()).thenReturn(true); + Mockito.when(eofwatcher.eofDetected(Mockito.any())).thenReturn(Boolean.TRUE); + + Assert.assertEquals(0, eofstream.read()); + Assert.assertNull(eofstream.getWrappedStream()); + Mockito.verify(instream, Mockito.times(1)).close(); + Mockito.verify(eofwatcher).eofDetected(instream); + } + + @Test + public void testReadNotAtEof() throws Exception { + Mockito.when(instream.read()).thenReturn(0); + Mockito.when(instream.atEof()).thenReturn(false); + Mockito.when(eofwatcher.eofDetected(Mockito.any())).thenReturn(Boolean.TRUE); + + Assert.assertEquals(0, eofstream.read()); + Assert.assertNotNull(eofstream.getWrappedStream()); + Mockito.verify(instream, Mockito.never()).close(); + Mockito.verify(eofwatcher, Mockito.never()).eofDetected(instream); + } + + @Test + public void testReadByteArrayAtEof() throws Exception { + Mockito.when(instream.read(Mockito.any(), Mockito.anyInt(), Mockito.anyInt())) + .thenReturn(1); + Mockito.when(instream.atEof()).thenReturn(true); + Mockito.when(eofwatcher.eofDetected(Mockito.any())).thenReturn(Boolean.TRUE); + + final byte[] tmp = new byte[1]; + + Assert.assertEquals(1, eofstream.read(tmp)); + Assert.assertNull(eofstream.getWrappedStream()); + Mockito.verify(instream, Mockito.times(1)).close(); + Mockito.verify(eofwatcher).eofDetected(instream); + } + + @Test + public void testReadByteArrayNotAtEof() throws Exception { + Mockito.when(instream.read(Mockito.any(), Mockito.anyInt(), Mockito.anyInt())) + .thenReturn(1); + Mockito.when(instream.atEof()).thenReturn(false); + + final byte[] tmp = new byte[1]; + + Assert.assertEquals(1, eofstream.read(tmp)); + Assert.assertNotNull(eofstream.getWrappedStream()); + Mockito.verify(instream, Mockito.never()).close(); + Mockito.verify(eofwatcher, Mockito.never()).eofDetected(instream); + } +}