Skip to content

Improve connection reuse logic #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import java.io.IOException;
import java.io.InputStream;

import org.apache.hc.core5.http.Chars;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.MalformedChunkCodingException;
import org.apache.hc.core5.http.StreamClosedException;
import org.apache.hc.core5.http.TruncatedChunkException;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.io.EofInputStream;
import org.apache.hc.core5.http.io.SessionInputBuffer;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.CharArrayBuffer;
Expand All @@ -57,7 +59,7 @@
* @since 4.0
*
*/
public class ChunkedInputStream extends InputStream {
public class ChunkedInputStream extends EofInputStream {

private static final int CHUNK_LEN = 1;
private static final int CHUNK_DATA = 2;
Expand Down Expand Up @@ -154,6 +156,7 @@ public int read() throws IOException {
pos++;
if (pos >= chunkSize) {
state = CHUNK_CRLF;
consumeEndOfMessageIfBuffered();
}
}
return b;
Expand Down Expand Up @@ -190,6 +193,7 @@ public int read (final byte[] b, final int off, final int len) throws IOExceptio
pos += bytesRead;
if (pos >= chunkSize) {
state = CHUNK_CRLF;
consumeEndOfMessageIfBuffered();
}
return bytesRead;
}
Expand Down Expand Up @@ -324,4 +328,60 @@ public Header[] getFooters() {
return this.footers.clone();
}

private void consumeEndOfMessageIfBuffered() throws IOException {
if (eof || state != CHUNK_CRLF) {
return;
}

// To avoid blocking, peek at the unread buffered bytes.
// All content has been read if the unread bytes consist of:
// - CR+LF
// - A chunk size of 0, followed by CR+LF
// - Zero or more footers followed by CR+LF
// - An empty line
final int bufferedLen = buffer.getBufferedLen();
if (bufferedLen < 2 || buffer.peekBuffered(0) != Chars.CR || buffer.peekBuffered(1) != Chars.LF) {
return;
}

// check if the next buffered line contains all zeros
int cur = 2;
boolean foundZeros = false;
for (; cur < bufferedLen; cur++) {
if (buffer.peekBuffered(cur) == Chars.CR) {
break;
} else if (buffer.peekBuffered(cur) == '0') {
foundZeros = true;
} else {
return;
}
}
if (!foundZeros) {
return;
}

// check if the remaining buffered data contains an empty line,
// skipping any footers
boolean foundEmptyLine = false;
for (; cur + 3 < bufferedLen; cur++) {
if (buffer.peekBuffered(cur) == Chars.CR &&
buffer.peekBuffered(cur + 1) == Chars.LF &&
buffer.peekBuffered(cur + 2) == Chars.CR &&
buffer.peekBuffered(cur + 3) == Chars.LF) {
foundEmptyLine = true;
break;
}
}
if (foundEmptyLine == false) {
return;
}

// Read the next chunk header and footers. The operation should not block
nextChunk();
}

@Override
public boolean atEof() {
return eof;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.StreamClosedException;
import org.apache.hc.core5.http.io.EofInputStream;
import org.apache.hc.core5.http.io.SessionInputBuffer;
import org.apache.hc.core5.util.Args;

Expand All @@ -51,7 +52,7 @@
*
* @since 4.0
*/
public class ContentLengthInputStream extends InputStream {
public class ContentLengthInputStream extends EofInputStream {

private static final int BUFFER_SIZE = 2048;

Expand Down Expand Up @@ -223,4 +224,9 @@ public long skip(final long n) throws IOException {
}
return count;
}

@Override
public boolean atEof() {
return pos == contentLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,4 +384,14 @@ public HttpTransportMetrics getMetrics() {
return this.metrics;
}

@Override
public int getBufferedLen() {
return bufferlen - bufferpos;
}

@Override
public byte peekBuffered(final int offset) {
return buffer[this.bufferpos + offset];
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.hc.core5.http.io;

import java.io.InputStream;

public abstract class EofInputStream extends InputStream {

/**
* Tries to determine, without blocking, if the entire content of the stream has already been read
* @return true if the entire content of the stream has already been read, false if either
* the entire content has not been read or a determination cannot be made without blocking
*/
public abstract boolean atEof();
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void close() throws IOException {
private void checkEOF(final int eof) throws IOException {

final InputStream toCheckStream = wrappedStream;
if ((toCheckStream != null) && (eof < 0)) {
if ((toCheckStream != null) && (eof < 0 || atEof(toCheckStream))) {
try {
boolean scws = true; // should close wrapped stream?
if (eofWatcher != null) {
Expand All @@ -207,6 +207,10 @@ private void checkEOF(final int eof) throws IOException {
}
}

private static boolean atEof(final InputStream toCheckStream) {
return (toCheckStream instanceof EofInputStream) && ((EofInputStream)toCheckStream).atEof();
}

/**
* Detects stream close and notifies the watcher.
* There's not much to detect since this is called by {@link #close close}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,17 @@ public interface SessionInputBuffer {
*/
HttpTransportMetrics getMetrics();

/**
* Returns the number of bytes stored in the session buffer
* @return the number of bytes in the session buffer
*/
int getBufferedLen();

/**
* Returns the byte stored at the given offset in the session buffer.
* The offset must be >= 0 and < getBufferedLen() - 1.
* @param offset in the buffer
* @return the buffered byte
*/
byte peekBuffered(int offset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ public void testChunkedInputStreamLargeBuffer() throws IOException {
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read(buffer));
Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read(buffer));
Assert.assertTrue(in.atEof());

in.close();

Expand Down Expand Up @@ -97,8 +100,12 @@ public void testChunkedInputStreamSmallBuffer() throws IOException {
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read(buffer));
Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read(buffer));
Assert.assertTrue(in.atEof());


in.close();

Expand All @@ -124,8 +131,12 @@ public void testChunkedInputStreamOneByteRead() throws IOException {
Assert.assertEquals(i, ch);
i++;
}
Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read());
Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read());
Assert.assertTrue(in.atEof());


in.close();
}
Expand Down Expand Up @@ -507,5 +518,43 @@ public void testHugeChunk() throws IOException {
Assert.assertEquals("01234567", result);
}

@Test
public void testAtEof() throws IOException {
final SessionInputBuffer inbuffer = new SessionInputBufferImpl(128);
final String chunkedInput = CHUNKED_INPUT + "\r\n";
final ByteArrayInputStream inputStream = new ByteArrayInputStream(chunkedInput.getBytes(StandardCharsets.ISO_8859_1));
final ChunkedInputStream in = new ChunkedInputStream(inbuffer, inputStream);
final byte[] buffer = new byte[64];
final ByteArrayOutputStream out = new ByteArrayOutputStream();
int len;
len = in.read(buffer);
Assert.assertEquals(16, len);
Assert.assertFalse(in.atEof());
out.write(buffer, 0, len);

len = in.read(buffer);
Assert.assertEquals(5, len);
Assert.assertTrue(in.atEof());
out.write(buffer, 0, len);

Assert.assertEquals(-1, in.read(buffer));
Assert.assertTrue(in.atEof());
Assert.assertEquals(-1, in.read(buffer));
Assert.assertTrue(in.atEof());

in.close();

final String result = new String(out.toByteArray(), StandardCharsets.ISO_8859_1);
Assert.assertEquals(result, CHUNKED_RESULT);

final Header[] footers = in.getFooters();
Assert.assertNotNull(footers);
Assert.assertEquals(2, footers.length);
Assert.assertEquals("Footer1", footers[0].getName());
Assert.assertEquals("abcde", footers[0].getValue());
Assert.assertEquals("Footer2", footers[1].getName());
Assert.assertEquals("fghij", footers[1].getValue());
}

}

Loading