Skip to content

Commit 6a1b21f

Browse files
jensmatwspencergibb
authored andcommitted
Adds flushing after each buffer write in RestClientProxyExchange in case of text/event-stream
Fixes gh-3410 Fixes gh-3486
1 parent c3ea4bb commit 6a1b21f

File tree

4 files changed

+172
-2
lines changed

4 files changed

+172
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2013-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.gateway.server.mvc.common;
18+
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.io.OutputStream;
22+
23+
import org.springframework.http.client.ClientHttpResponse;
24+
import org.springframework.util.Assert;
25+
import org.springframework.util.StreamUtils;
26+
27+
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
28+
29+
public abstract class HttpUtils {
30+
31+
private static final int BUFFER_SIZE = 16384;
32+
33+
private HttpUtils() {
34+
throw new AssertionError("Must not instantiate utility class.");
35+
}
36+
37+
public static int copyResponseBody(ClientHttpResponse clientResponse, InputStream inputStream,
38+
OutputStream outputStream) throws IOException {
39+
Assert.notNull(clientResponse, "No ClientResponse specified");
40+
Assert.notNull(inputStream, "No InputStream specified");
41+
Assert.notNull(outputStream, "No OutputStream specified");
42+
43+
int transferredBytes;
44+
45+
if (TEXT_EVENT_STREAM.equals(clientResponse.getHeaders().getContentType())) {
46+
transferredBytes = copyResponseBodyWithFlushing(inputStream, outputStream);
47+
}
48+
else {
49+
transferredBytes = StreamUtils.copy(inputStream, outputStream);
50+
}
51+
52+
return transferredBytes;
53+
}
54+
55+
private static int copyResponseBodyWithFlushing(InputStream inputStream, OutputStream outputStream)
56+
throws IOException {
57+
int readBytes;
58+
var totalReadBytes = 0;
59+
var buffer = new byte[BUFFER_SIZE];
60+
61+
while ((readBytes = inputStream.read(buffer)) != -1) {
62+
outputStream.write(buffer, 0, readBytes);
63+
outputStream.flush();
64+
if (totalReadBytes < Integer.MAX_VALUE) {
65+
try {
66+
totalReadBytes = Math.addExact(totalReadBytes, readBytes);
67+
}
68+
catch (ArithmeticException e) {
69+
totalReadBytes = Integer.MAX_VALUE;
70+
}
71+
}
72+
}
73+
74+
outputStream.flush();
75+
76+
return totalReadBytes;
77+
}
78+
79+
}

spring-cloud-gateway-server-mvc/src/main/java/org/springframework/cloud/gateway/server/mvc/handler/ClientHttpRequestFactoryProxyExchange.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.InputStream;
2121
import java.io.UncheckedIOException;
2222

23+
import org.springframework.cloud.gateway.server.mvc.common.HttpUtils;
2324
import org.springframework.cloud.gateway.server.mvc.common.MvcUtils;
2425
import org.springframework.http.client.ClientHttpRequest;
2526
import org.springframework.http.client.ClientHttpRequestFactory;
@@ -54,7 +55,8 @@ public ServerResponse exchange(Request request) {
5455
InputStream inputStream = MvcUtils.getAttribute(request.getServerRequest(),
5556
MvcUtils.CLIENT_RESPONSE_INPUT_STREAM_ATTR);
5657
// copy body from request to clientHttpRequest
57-
StreamUtils.copy(inputStream, httpServletResponse.getOutputStream());
58+
HttpUtils.copyResponseBody(clientHttpResponse, inputStream,
59+
httpServletResponse.getOutputStream());
5860
}
5961
return null;
6062
});

spring-cloud-gateway-server-mvc/src/main/java/org/springframework/cloud/gateway/server/mvc/handler/RestClientProxyExchange.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.OutputStream;
2222
import java.io.UncheckedIOException;
2323

24+
import org.springframework.cloud.gateway.server.mvc.common.HttpUtils;
2425
import org.springframework.cloud.gateway.server.mvc.common.MvcUtils;
2526
import org.springframework.http.client.ClientHttpResponse;
2627
import org.springframework.util.StreamUtils;
@@ -71,7 +72,7 @@ private static ServerResponse doExchange(Request request, ClientHttpResponse cli
7172
InputStream inputStream = MvcUtils.getAttribute(request.getServerRequest(),
7273
MvcUtils.CLIENT_RESPONSE_INPUT_STREAM_ATTR);
7374
// copy body from request to clientHttpRequest
74-
StreamUtils.copy(inputStream, httpServletResponse.getOutputStream());
75+
HttpUtils.copyResponseBody(clientResponse, inputStream, httpServletResponse.getOutputStream());
7576
}
7677
return null;
7778
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2013-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.gateway.server.mvc.common;
18+
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.io.OutputStream;
22+
23+
import org.junit.jupiter.api.Test;
24+
25+
import org.springframework.http.MediaType;
26+
import org.springframework.mock.http.client.MockClientHttpResponse;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
import static org.mockito.ArgumentMatchers.any;
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.times;
32+
import static org.mockito.Mockito.verify;
33+
import static org.mockito.Mockito.when;
34+
35+
/**
36+
* @author Jens Mallien
37+
*/
38+
public class HttpUtilsTests {
39+
40+
@Test
41+
public void copyResponseBodyForJson() throws IOException {
42+
MockClientHttpResponse mockResponse = new MockClientHttpResponse(new byte[0], 200);
43+
mockResponse.getHeaders().setContentType(MediaType.APPLICATION_JSON);
44+
45+
InputStream inputStream = mock(InputStream.class);
46+
when(inputStream.transferTo(any())).thenReturn(3L);
47+
OutputStream outputStream = mock(OutputStream.class);
48+
49+
int result = HttpUtils.copyResponseBody(mockResponse, inputStream, outputStream);
50+
51+
assertThat(result).isEqualTo(3);
52+
verify(outputStream, times(1)).flush();
53+
}
54+
55+
@Test
56+
public void copyResponseBodyForTextEventStream() throws IOException {
57+
MockClientHttpResponse mockResponse = new MockClientHttpResponse(new byte[0], 200);
58+
mockResponse.getHeaders().setContentType(MediaType.TEXT_EVENT_STREAM);
59+
60+
InputStream inputStream = mock(InputStream.class);
61+
when(inputStream.read(any()))
62+
.thenReturn(1)
63+
.thenReturn(1)
64+
.thenReturn(1)
65+
.thenReturn(-1);
66+
OutputStream outputStream = mock(OutputStream.class);
67+
68+
int result = HttpUtils.copyResponseBody(mockResponse, inputStream, outputStream);
69+
70+
assertThat(result).isEqualTo(3);
71+
verify(outputStream, times(4)).flush();
72+
}
73+
74+
@Test
75+
public void copyResponseBodyWithoutContentType() throws IOException {
76+
MockClientHttpResponse mockResponse = new MockClientHttpResponse(new byte[0], 200);
77+
78+
InputStream inputStream = mock(InputStream.class);
79+
when(inputStream.transferTo(any())).thenReturn(3L);
80+
OutputStream outputStream = mock(OutputStream.class);
81+
82+
int result = HttpUtils.copyResponseBody(mockResponse, inputStream, outputStream);
83+
84+
assertThat(result).isEqualTo(3);
85+
verify(outputStream, times(1)).flush();
86+
}
87+
88+
}

0 commit comments

Comments
 (0)