diff --git a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java index d392cb1fc8..4302ffdfee 100644 --- a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java +++ b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java @@ -44,7 +44,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.Closeable; -import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -64,7 +63,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; - +import javax.ws.rs.core.Response.StatusType; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; @@ -100,6 +99,7 @@ import org.apache.http.config.ConnectionConfig; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.ConnectionReleaseTrigger; import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.conn.ManagedHttpClientConnection; import org.apache.http.conn.routing.HttpRoute; @@ -430,8 +430,8 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing final HttpUriRequest request = getUriHttpRequest(clientRequest); final Map clientHeadersSnapshot = writeOutBoundHeaders(clientRequest.getHeaders(), request); + CloseableHttpResponse response = null; try { - final CloseableHttpResponse response; final HttpClientContext context = HttpClientContext.create(); if (preemptiveBasicAuth) { final AuthCache authCache = new BasicAuthCache(); @@ -442,11 +442,14 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing response = client.execute(getHost(request), request, context); HeaderUtils.checkHeaderChanges(clientHeadersSnapshot, clientRequest.getHeaders(), this.getClass().getName()); + final HttpEntity entity = response.getEntity(); + final InputStream entityContent = entity != null ? entity.getContent() : null; + final Response.StatusType status = response.getStatusLine().getReasonPhrase() == null ? Statuses.from(response.getStatusLine().getStatusCode()) : Statuses.from(response.getStatusLine().getStatusCode(), response.getStatusLine().getReasonPhrase()); - final ClientResponse responseContext = new ClientResponse(status, clientRequest); + final ClientResponse responseContext = new ApacheClientResponse(status, clientRequest, response, entityContent); final List redirectLocations = context.getRedirectLocations(); if (redirectLocations != null && !redirectLocations.isEmpty()) { responseContext.setResolvedRequestUri(redirectLocations.get(redirectLocations.size() - 1)); @@ -464,8 +467,6 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing headers.put(headerName, list); } - final HttpEntity entity = response.getEntity(); - if (entity != null) { if (headers.get(HttpHeaders.CONTENT_LENGTH) == null) { headers.add(HttpHeaders.CONTENT_LENGTH, String.valueOf(entity.getContentLength())); @@ -477,15 +478,18 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing } } - try { - responseContext.setEntityStream(new HttpClientResponseInputStream(getInputStream(response))); - } catch (final IOException e) { - LOGGER.log(Level.SEVERE, null, e); - } + responseContext.setEntityStream(bufferedStream(entityContent)); + + // prevent response-close on correct return + response = null; return responseContext; } catch (final Exception e) { throw new ProcessingException(e); + } finally { + if (response != null) { + ReaderWriter.safelyClose(response); + } } } @@ -617,40 +621,60 @@ private static Map writeOutBoundHeaders(final MultivaluedMapApache HttpClient + * documentation: + * + * The difference between closing the content stream and closing the response is that the former will attempt to keep + * the underlying connection alive by consuming the entity content while the latter immediately shuts down and discards + * the connection. + * + * JAX-RS spec is silent whether closing the content stream consumes the response or closes the connection. This + * ApacheConnector follows apache-behaviour. + */ + private static final class ApacheClientResponse extends ClientResponse { + + private final CloseableHttpResponse httpResponse; - HttpClientResponseInputStream(final InputStream inputStream) throws IOException { - super(inputStream); + private final InputStream entityContent; + + public ApacheClientResponse(StatusType status, ClientRequest requestContext, CloseableHttpResponse httpResponse, + InputStream entityContent) { + super(status, requestContext); + this.httpResponse = httpResponse; + this.entityContent = entityContent; } @Override - public void close() throws IOException { - super.close(); + public void close() { + try { + if (entityContent instanceof ConnectionReleaseTrigger) { + // necessary to prevent an exception during stream-close in apache httpclient 4.5.1+ + ((ConnectionReleaseTrigger) entityContent).abortConnection(); + } + httpResponse.close(); + } catch (IOException e) { + // Cannot happen according to ConnectionHolder#releaseConnection + throw new ProcessingException(e); + } finally { + super.close(); + } } } - private static InputStream getInputStream(final CloseableHttpResponse response) throws IOException { + private static InputStream bufferedStream(final InputStream entityContent) { final InputStream inputStream; - if (response.getEntity() == null) { + if (entityContent == null) { inputStream = new ByteArrayInputStream(new byte[0]); } else { - final InputStream i = response.getEntity().getContent(); - if (i.markSupported()) { - inputStream = i; - } else { - inputStream = new BufferedInputStream(i, ReaderWriter.BUFFER_SIZE); - } + inputStream = new BufferedInputStream(entityContent, ReaderWriter.BUFFER_SIZE); } - return new FilterInputStream(inputStream) { - @Override - public void close() throws IOException { - response.close(); - super.close(); - } - }; + return inputStream; } private static class ConnectionFactory extends ManagedHttpClientConnectionFactory { diff --git a/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/ClosingTest.java b/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/ClosingTest.java new file mode 100644 index 0000000000..74b28f56d6 --- /dev/null +++ b/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/ClosingTest.java @@ -0,0 +1,180 @@ +package org.glassfish.jersey.apache.connector; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Test; + +public class ClosingTest extends JerseyTest { + + private PoolingHttpClientConnectionManager connectionManager; + + @Override + protected Application configure() { + ResourceConfig config = new ResourceConfig(); + config.register(TestResource.class); + return config; + } + + @Override + protected void configureClient(ClientConfig config) { + this.connectionManager = new PoolingHttpClientConnectionManager(60, TimeUnit.SECONDS); + config.property(ApacheClientProperties.CONNECTION_MANAGER, connectionManager); + config.connectorProvider(new ApacheConnectorProvider()); + } + + private int getLeasedConnections() { + return connectionManager.getTotalStats().getLeased(); + } + + private int getAvailableConnections() { + return connectionManager.getTotalStats().getAvailable(); + } + + private int getOpenConnections() { + return getLeasedConnections() + getAvailableConnections(); + } + + @Test + public void testClosingUnconsumedResponseAbortsConnection() throws Exception { + assertEquals(0, getOpenConnections()); + + Response response = target().path("productInfo") + .request(MediaType.TEXT_PLAIN_TYPE) + .get(); + assertEquals(200, response.getStatus()); + + assertEquals(1, getLeasedConnections()); + InputStream entityStream = response.readEntity(InputStream.class); + + // should close the connection without consuming it. must not throw here + response.close(); + assertEquals(0, getOpenConnections()); + + // must not throw here + entityStream.close(); + assertEquals(0, getOpenConnections()); + } + + @Test + public void testClosingUnconsumedStreamConsumesConnection() throws Exception { + assertEquals(0, getOpenConnections()); + + Response response = target().path("productInfo") + .request(MediaType.TEXT_PLAIN_TYPE) + .get(); + assertEquals(200, response.getStatus()); + + InputStream entityStream = response.readEntity(InputStream.class); + + // should consume the stream. must not throw here + entityStream.close(); + // connection should be kept alive after consume + assertEquals(0, getLeasedConnections()); + assertEquals(1, getAvailableConnections()); + + // must not throw here + response.close(); + assertEquals(0, getLeasedConnections()); + assertEquals(1, getAvailableConnections()); + } + + @Test + public void testClosingConsumedStream() throws Exception { + assertEquals(0, getOpenConnections()); + + Response response = target().path("productInfo") + .request(MediaType.TEXT_PLAIN_TYPE) + .get(); + assertEquals(200, response.getStatus()); + + InputStream entityStream = response.readEntity(InputStream.class); + + consume(entityStream); + + // connection should be kept alive after consume + assertEquals(0, getLeasedConnections()); + assertEquals(1, getAvailableConnections()); + + entityStream.close(); + response.close(); + + assertEquals(0, getLeasedConnections()); + assertEquals(1, getAvailableConnections()); + } + + @Test + public void testClosingConsumedResponse() throws Exception { + assertEquals(0, getOpenConnections()); + + Response response = target().path("productInfo") + .request(MediaType.TEXT_PLAIN_TYPE) + .get(); + assertEquals(200, response.getStatus()); + + InputStream entityStream = response.readEntity(InputStream.class); + + consume(entityStream); + + // connection should be kept alive after consume + assertEquals(0, getLeasedConnections()); + assertEquals(1, getAvailableConnections()); + + response.close(); + entityStream.close(); + + assertEquals(0, getLeasedConnections()); + assertEquals(1, getAvailableConnections()); + } + + @Test + public void testBufferedMultipleReadEntity() throws Exception { + assertEquals(0, getOpenConnections()); + + Response response = target().path("productInfo") + .request(MediaType.TEXT_PLAIN_TYPE) + .get(); + + response.bufferEntity(); + assertEquals(0, getLeasedConnections()); + assertEquals(1, getAvailableConnections()); + + assertEquals("foo\n", new String(response.readEntity(byte[].class), "us-ascii")); + assertEquals("foo\n", response.readEntity(String.class)); + + response.close(); + + assertEquals(0, getLeasedConnections()); + assertEquals(1, getAvailableConnections()); + } + + private static void consume(InputStream in) throws IOException { + byte[] buffer = new byte[1024]; + for (int readden = in.read(buffer); readden >= 0; readden = in.read(buffer)) { + } + } + + @Path("/") + public static class TestResource { + @GET + @Path("/productInfo") + @Produces(MediaType.TEXT_PLAIN) + public String getProductInfo() { + return "foo\n"; + } + } +} diff --git a/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java b/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java deleted file mode 100644 index d345c07188..0000000000 --- a/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. - * - * Copyright (c) 2015 Oracle and/or its affiliates. All rights reserved. - * - * The contents of this file are subject to the terms of either the GNU - * General Public License Version 2 only ("GPL") or the Common Development - * and Distribution License("CDDL") (collectively, the "License"). You - * may not use this file except in compliance with the License. You can - * obtain a copy of the License at - * http://glassfish.java.net/public/CDDL+GPL_1_1.html - * or packager/legal/LICENSE.txt. See the License for the specific - * language governing permissions and limitations under the License. - * - * When distributing the software, include this License Header Notice in each - * file and include the License file at packager/legal/LICENSE.txt. - * - * GPL Classpath Exception: - * Oracle designates this particular file as subject to the "Classpath" - * exception as provided by Oracle in the GPL Version 2 section of the License - * file that accompanied this code. - * - * Modifications: - * If applicable, add the following below the License Header, with the fields - * enclosed by brackets [] replaced by your own identifying information: - * "Portions Copyright [year] [name of copyright owner]" - * - * Contributor(s): - * If you wish your version of this file to be governed by only the CDDL or - * only the GPL Version 2, indicate your decision by adding "[Contributor] - * elects to include this software in this distribution under the [CDDL or GPL - * Version 2] license." If you don't indicate a single choice of license, a - * recipient has the option to distribute your version of this file under - * either the CDDL, the GPL Version 2 or to extend the choice of license to - * its licensees as provided above. However, if you add GPL Version 2 code - * and therefore, elected the GPL Version 2 license, then the option applies - * only if the new code is made subject to such option by the copyright - * holder. - */ - -package org.glassfish.jersey.apache.connector; - -import java.io.IOException; -import java.io.InputStream; - -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Application; -import javax.ws.rs.core.MediaType; - -import javax.inject.Singleton; - -import org.glassfish.jersey.client.ClientConfig; -import org.glassfish.jersey.server.ChunkedOutput; -import org.glassfish.jersey.server.ResourceConfig; -import org.glassfish.jersey.test.JerseyTest; - -import org.junit.Test; -import static org.junit.Assert.assertEquals; - -/** - * @author Petr Janouch (petr.janouch at oracle.com) - */ -public class StreamingTest extends JerseyTest { - - /** - * Test that a data stream can be terminated from the client side. - */ - @Test - public void clientCloseTest() throws IOException { - // start streaming - InputStream inputStream = target().path("/streamingEndpoint").request().get(InputStream.class); - - WebTarget sendTarget = target().path("/streamingEndpoint/send"); - // trigger sending 'A' to the stream; OK is sent if everything on the server was OK - assertEquals("OK", sendTarget.request().get().readEntity(String.class)); - // check 'A' has been sent - assertEquals('A', inputStream.read()); - // closing the stream should tear down the connection - inputStream.close(); - // trigger sending another 'A' to the stream; it should fail - // (indicating that the streaming has been terminated on the server) - assertEquals("NOK", sendTarget.request().get().readEntity(String.class)); - } - - @Override - protected void configureClient(ClientConfig config) { - config.connectorProvider(new ApacheConnectorProvider()); - } - - @Override - protected Application configure() { - return new ResourceConfig(StreamingEndpoint.class); - } - - @Singleton - @Path("streamingEndpoint") - public static class StreamingEndpoint { - - private final ChunkedOutput output = new ChunkedOutput<>(String.class); - - @GET - @Path("send") - public String sendEvent() { - try { - output.write("A"); - } catch (IOException e) { - return "NOK"; - } - - return "OK"; - } - - @GET - @Produces(MediaType.TEXT_PLAIN) - public ChunkedOutput get() { - return output; - } - } -}