-
Notifications
You must be signed in to change notification settings - Fork 3.9k
11246 :: Unexpected error when server expands a compressed message to learn it is too large #12360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 12 commits
fe36e6d
61fbe27
bb929aa
1024a7f
5410196
14adfdb
631edd3
92b75a8
894dc66
65fbebb
3b9ebbe
55741e4
26095ef
f16cd6b
a511a77
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -330,8 +330,17 @@ private void messagesAvailableInternal(final MessageProducer producer) { | |
| InputStream message; | ||
| try { | ||
| while ((message = producer.next()) != null) { | ||
| ReqT parsedMessage; | ||
| try { | ||
| listener.onMessage(call.method.parseRequest(message)); | ||
| parsedMessage = call.method.parseRequest(message); | ||
| } catch (StatusRuntimeException e) { | ||
| GrpcUtil.closeQuietly(message); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. additionally need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted. |
||
| call.cancelled = true; | ||
| call.close(e.getStatus(), new Metadata()); | ||
| return; | ||
| } | ||
| try { | ||
| listener.onMessage(parsedMessage); | ||
| } catch (Throwable t) { | ||
| GrpcUtil.closeQuietly(message); | ||
|
||
| throw t; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,9 +48,11 @@ | |
| import io.grpc.SecurityLevel; | ||
| import io.grpc.ServerCall; | ||
| import io.grpc.Status; | ||
| import io.grpc.StatusRuntimeException; | ||
| import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl; | ||
| import io.perfmark.PerfMark; | ||
| import java.io.ByteArrayInputStream; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.io.InputStreamReader; | ||
| import org.junit.Before; | ||
|
|
@@ -69,6 +71,8 @@ public class ServerCallImplTest { | |
|
|
||
| @Mock private ServerStream stream; | ||
| @Mock private ServerCall.Listener<Long> callListener; | ||
| @Mock private StreamListener.MessageProducer messageProducer; | ||
| @Mock private InputStream message; | ||
|
|
||
| private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create(); | ||
| private ServerCallImpl<Long, Long> call; | ||
|
|
@@ -493,6 +497,45 @@ public void streamListener_unexpectedRuntimeException() { | |
| assertThat(e).hasMessageThat().isEqualTo("unexpected exception"); | ||
| } | ||
|
|
||
| @Test | ||
| public void streamListener_statusRuntimeException() throws IOException { | ||
| MethodDescriptor<Long, Long> failingParseMethod = MethodDescriptor.<Long, Long>newBuilder() | ||
| .setType(MethodType.UNARY) | ||
| .setFullMethodName("service/method") | ||
| .setRequestMarshaller(new LongMarshaller() { | ||
| @Override | ||
| public Long parse(InputStream stream) { | ||
| throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED | ||
| .withDescription("Decompressed gRPC message exceeds maximum size")); | ||
| } | ||
| }) | ||
| .setResponseMarshaller(new LongMarshaller()) | ||
| .build(); | ||
|
|
||
| call = new ServerCallImpl<>(stream, failingParseMethod, requestHeaders, context, | ||
| DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), | ||
| serverCallTracer, PerfMark.createTag()); | ||
|
|
||
| ServerStreamListenerImpl<Long> streamListener = | ||
| new ServerCallImpl.ServerStreamListenerImpl<>(call, callListener, context); | ||
|
|
||
| when(messageProducer.next()).thenReturn(message, (InputStream) null); | ||
| streamListener.messagesAvailable(messageProducer); | ||
| ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); | ||
| ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class); | ||
|
|
||
| verify(stream).close(statusCaptor.capture(),metadataCaptor.capture()); | ||
|
||
| Status status = statusCaptor.getValue(); | ||
| assertEquals(Status.RESOURCE_EXHAUSTED.getCode(), status.getCode()); | ||
| assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription()); | ||
|
|
||
| streamListener.halfClosed(); | ||
| verify(callListener, never()).onHalfClose(); | ||
| verify(messageProducer).next(); | ||
| verify(message).close(); | ||
| verify(callListener,never()).onMessage(any()); | ||
| } | ||
|
|
||
| private static class LongMarshaller implements Marshaller<Long> { | ||
| @Override | ||
| public InputStream stream(Long value) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
| package io.grpc.testing.integration; | ||
|
|
||
| import static org.junit.Assert.assertEquals; | ||
| import static org.junit.Assert.assertThrows; | ||
| import static org.junit.Assert.assertTrue; | ||
|
|
||
| import com.google.protobuf.ByteString; | ||
|
|
@@ -37,6 +38,8 @@ | |
| import io.grpc.ServerCall.Listener; | ||
| import io.grpc.ServerCallHandler; | ||
| import io.grpc.ServerInterceptor; | ||
| import io.grpc.Status.Code; | ||
| import io.grpc.StatusRuntimeException; | ||
| import io.grpc.internal.GrpcUtil; | ||
| import io.grpc.netty.InternalNettyChannelBuilder; | ||
| import io.grpc.netty.InternalNettyServerBuilder; | ||
|
|
@@ -53,7 +56,9 @@ | |
| import java.io.OutputStream; | ||
| import org.junit.Before; | ||
| import org.junit.BeforeClass; | ||
| import org.junit.Rule; | ||
| import org.junit.Test; | ||
| import org.junit.rules.TestName; | ||
| import org.junit.runner.RunWith; | ||
| import org.junit.runners.JUnit4; | ||
|
|
||
|
|
@@ -84,10 +89,16 @@ public static void registerCompressors() { | |
| compressors.register(Codec.Identity.NONE); | ||
| } | ||
|
|
||
| @Rule | ||
| public final TestName currentTest = new TestName(); | ||
|
|
||
| @Override | ||
| protected ServerBuilder<?> getServerBuilder() { | ||
| NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create()) | ||
| .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) | ||
| .maxInboundMessageSize( | ||
| DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME.equals(currentTest.getMethodName()) | ||
| ? 1000 | ||
| : AbstractInteropTest.MAX_MESSAGE_SIZE) | ||
| .compressorRegistry(compressors) | ||
| .decompressorRegistry(decompressors) | ||
| .intercept(new ServerInterceptor() { | ||
|
|
@@ -126,6 +137,22 @@ public void compresses() { | |
| assertTrue(FZIPPER.anyWritten); | ||
| } | ||
|
|
||
| private static final String DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME = | ||
| "decompressedMessageTooLong"; | ||
|
|
||
| @Test | ||
| public void decompressedMessageTooLong() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This e2e test is great but the unit test you previously wrote in ServerCallImplTest also lets us test code paths in a more fine-grained way. In particular lets re-introduce that test and add the following after causing the parse exception to happen in message handling - in order to test that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @kannanjgithub for the review and suggestions. I've addressed the comments , please review the changes when you have a moment. |
||
| assertEquals(DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME, currentTest.getMethodName()); | ||
| final SimpleRequest bigRequest = SimpleRequest.newBuilder() | ||
| .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[10_000]))) | ||
| .build(); | ||
| StatusRuntimeException e = assertThrows(StatusRuntimeException.class, | ||
| () -> blockingStub.withCompression("gzip").unaryCall(bigRequest)); | ||
| assertCodeEquals(Code.RESOURCE_EXHAUSTED, e.getStatus()); | ||
| assertEquals("Decompressed gRPC message exceeds maximum size 1000", | ||
| e.getStatus().getDescription()); | ||
| } | ||
|
|
||
| @Override | ||
| protected NettyChannelBuilder createChannelBuilder() { | ||
| NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like can be moved below - inside try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted.