Skip to content

Commit 89e053a

Browse files
authored
Set gRPC message limit to 2GB (#746)
1 parent 79045b2 commit 89e053a

File tree

4 files changed

+37
-3
lines changed

4 files changed

+37
-3
lines changed

langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,11 @@ public void start() throws Exception {
100100
}
101101
blockingStub =
102102
AgentServiceGrpc.newBlockingStub(channel).withDeadlineAfter(30, TimeUnit.SECONDS);
103-
asyncStub = AgentServiceGrpc.newStub(channel).withWaitForReady();
103+
asyncStub =
104+
AgentServiceGrpc.newStub(channel)
105+
.withWaitForReady()
106+
.withMaxInboundMessageSize(Integer.MAX_VALUE)
107+
.withMaxOutboundMessageSize(Integer.MAX_VALUE);
104108

105109
topicProducerWriteResults = new CompletableFuture<>();
106110
topicProducerWriteResults.complete(

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,12 @@ def get_topic_producer(self) -> TopicProducer:
415415
class AgentServer(object):
416416
def __init__(self, target: str):
417417
self.target = target
418-
self.grpc_server = grpc.aio.server()
418+
self.grpc_server = grpc.aio.server(
419+
options=[
420+
("grpc.max_send_message_length", 0x7FFFFFFF),
421+
("grpc.max_receive_message_length", 0x7FFFFFFF),
422+
]
423+
)
419424
self.port = self.grpc_server.add_insecure_port(target)
420425
self.agent = None
421426

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,13 @@ async def __aenter__(self):
3636
self.server = AgentServer("[::]:0")
3737
await self.server.init(json.dumps(self.config), json.dumps(self.context))
3838
await self.server.start()
39-
self.channel = grpc.aio.insecure_channel("localhost:%d" % self.server.port)
39+
self.channel = grpc.aio.insecure_channel(
40+
"localhost:%d" % self.server.port,
41+
options=[
42+
("grpc.max_send_message_length", 0x7FFFFFFF),
43+
("grpc.max_receive_message_length", 0x7FFFFFFF),
44+
],
45+
)
4046
self.stub = AgentServiceStub(channel=self.channel)
4147
return self
4248

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,25 @@ async def test_future_record(klass):
201201
assert response.results[0].records[0].value.string_value == "test"
202202

203203

204+
async def test_big_record():
205+
async with ServerAndStub(
206+
"langstream_grpc.tests.test_grpc_processor.MyProcessor"
207+
) as server_and_stub:
208+
long_string = "a" * 10_000_000
209+
response: ProcessorResponse
210+
async for response in server_and_stub.stub.process(
211+
[
212+
ProcessorRequest(
213+
records=[GrpcRecord(value=Value(string_value=long_string))]
214+
)
215+
]
216+
):
217+
assert len(response.results) == 1
218+
assert response.results[0].HasField("error") is False
219+
assert len(response.results[0].records) == 1
220+
assert response.results[0].records[0].value.string_value == long_string
221+
222+
204223
async def test_info():
205224
async with ServerAndStub(
206225
"langstream_grpc.tests.test_grpc_processor.MyProcessor"

0 commit comments

Comments
 (0)