Skip to content

Added option to control the grpc stream flow #505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/src/main/java/tech/ydb/core/grpc/GrpcFlowControl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package tech.ydb.core.grpc;

import java.util.function.IntConsumer;

/**
*
* @author Aleksandr Gorshenin
*/
@FunctionalInterface
public interface GrpcFlowControl {
interface Call {
void onStart();
void onMessageRead();
}

Call newCall(IntConsumer req);
}
1 change: 1 addition & 0 deletions core/src/main/java/tech/ydb/core/grpc/GrpcReadStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* @param <R> type of message received
*/
public interface GrpcReadStream<R> {
@FunctionalInterface
interface Observer<R> {
void onNext(R value);
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import io.grpc.Metadata;

import tech.ydb.core.impl.call.GrpcFlows;

/**
* @author Nikolay Perfilov
*/
Expand All @@ -16,13 +18,15 @@ public class GrpcRequestSettings {
private final String traceId;
private final List<String> clientCapabilities;
private final Consumer<Metadata> trailersHandler;
private final GrpcFlowControl flowControl;

private GrpcRequestSettings(Builder builder) {
this.deadlineAfter = builder.deadlineAfter;
this.preferredNodeID = builder.preferredNodeID;
this.traceId = builder.traceId;
this.clientCapabilities = builder.clientCapabilities;
this.trailersHandler = builder.trailersHandler;
this.flowControl = builder.flowControl;
}

public static Builder newBuilder() {
Expand All @@ -49,12 +53,17 @@ public Consumer<Metadata> getTrailersHandler() {
return trailersHandler;
}

public GrpcFlowControl getFlowControl() {
return flowControl;
}

public static final class Builder {
private long deadlineAfter = 0L;
private Integer preferredNodeID = null;
private String traceId = null;
private List<String> clientCapabilities = null;
private Consumer<Metadata> trailersHandler = null;
private GrpcFlowControl flowControl = GrpcFlows.SIMPLE_FLOW;

/**
* Returns a new {@code Builder} with a deadline, based on the running Java Virtual Machine's
Expand Down Expand Up @@ -95,6 +104,11 @@ public Builder withTraceId(String traceId) {
return this;
}

public Builder withFlowControl(GrpcFlowControl flowCtrl) {
this.flowControl = flowCtrl;
return this;
}

public Builder withClientCapabilities(List<String> clientCapabilities) {
this.clientCapabilities = clientCapabilities;
return this;
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcFlowControl;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
Expand Down Expand Up @@ -132,7 +133,9 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
);
}

return new ReadStreamCall<>(traceId, call, request, makeMetadataFromSettings(settings), handler);
Metadata metadata = makeMetadataFromSettings(settings);
GrpcFlowControl flowCtrl = settings.getFlowControl();
return new ReadStreamCall<>(traceId, call, flowCtrl, request, metadata, handler);
} catch (UnexpectedResultException ex) {
logger.warn("ReadStreamCall[{}] got unexpected status {}", traceId, ex.getStatus());
return new EmptyStream<>(ex.getStatus());
Expand Down Expand Up @@ -175,9 +178,9 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
);
}

return new ReadWriteStreamCall<>(
traceId, call, makeMetadataFromSettings(settings), getAuthCallOptions(), handler
);
Metadata metadata = makeMetadataFromSettings(settings);
GrpcFlowControl flowCtrl = settings.getFlowControl();
return new ReadWriteStreamCall<>(traceId, call, flowCtrl, metadata, getAuthCallOptions(), handler);
} catch (UnexpectedResultException ex) {
logger.warn("ReadWriteStreamCall[{}] got unexpected status {}", traceId, ex.getStatus());
return new EmptyStream<>(ex.getStatus());
Expand Down
33 changes: 33 additions & 0 deletions core/src/main/java/tech/ydb/core/impl/call/GrpcFlows.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package tech.ydb.core.impl.call;

import java.util.function.IntConsumer;

import tech.ydb.core.grpc.GrpcFlowControl;

/**
*
* @author Aleksandr Gorshenin
*/
public class GrpcFlows {
public static final GrpcFlowControl SIMPLE_FLOW = SimpleCall::new;

private GrpcFlows() { }

private static class SimpleCall implements GrpcFlowControl.Call {
private final IntConsumer req;

SimpleCall(IntConsumer req) {
this.req = req;
}

@Override
public void onStart() {
req.accept(1);
}

@Override
public void onMessageRead() {
req.accept(1);
}
}
}
65 changes: 36 additions & 29 deletions core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -16,6 +15,7 @@
import org.slf4j.LoggerFactory;

import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcFlowControl;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcStatuses;
import tech.ydb.core.grpc.GrpcTransport;
Expand All @@ -35,48 +35,51 @@ public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> impl
private final GrpcStatusHandler statusConsumer;
private final ReqT request;
private final Metadata headers;
private final GrpcFlowControl.Call flow;

private final CompletableFuture<Status> statusFuture = new CompletableFuture<>();
private final AtomicReference<Observer<RespT>> observerReference = new AtomicReference<>();

public ReadStreamCall(
String traceId,
ClientCall<ReqT, RespT> call,
ReqT request,
Metadata headers,
GrpcStatusHandler statusConsumer
) {

private Observer<RespT> consumer;

public ReadStreamCall(String traceId, ClientCall<ReqT, RespT> call, GrpcFlowControl flowCtrl, ReqT req,
Metadata headers, GrpcStatusHandler statusHandler) {
this.traceId = traceId;
this.call = call;
this.request = request;
this.request = req;
this.headers = headers;
this.statusConsumer = statusConsumer;
this.statusConsumer = statusHandler;
this.flow = flowCtrl.newCall(this::nextRequest);
}

@Override
public CompletableFuture<Status> start(Observer<RespT> observer) {
if (!observerReference.compareAndSet(null, observer)) {
throw new IllegalStateException("Read stream call is already started");
}

callLock.lock();
try {
if (consumer != null) {
throw new IllegalStateException("Read stream call is already started");
}
if (observer == null) {
throw new IllegalArgumentException("Observer must be not empty");
}

consumer = observer;
call.start(this, headers);
if (logger.isTraceEnabled()) {
logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
}
call.sendMessage(request);
// close stream by client side
call.halfClose();
call.request(1);
} catch (Throwable t) {
// init flow
flow.onStart();
} catch (Throwable th) {
statusFuture.completeExceptionally(th);

try {
call.cancel(null, t);
call.cancel(null, th);
} catch (Throwable ex) {
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex);
}

statusFuture.completeExceptionally(t);
} finally {
callLock.unlock();
}
Expand All @@ -94,20 +97,24 @@ public void cancel() {
}
}

private void nextRequest(int count) {
// request delivery of the next inbound message.
callLock.lock();
try {
call.request(count);
} finally {
callLock.unlock();
}
}

@Override
public void onMessage(RespT message) {
try {
if (logger.isTraceEnabled()) {
logger.trace("ReadStreamCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) message));
}
observerReference.get().onNext(message);
// request delivery of the next inbound message.
callLock.lock();
try {
call.request(1);
} finally {
callLock.unlock();
}
consumer.onNext(message);
flow.onMessageRead();
} catch (Exception ex) {
statusFuture.completeExceptionally(ex);

Expand Down
57 changes: 31 additions & 26 deletions core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -18,6 +17,7 @@
import org.slf4j.LoggerFactory;

import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcFlowControl;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcStatuses;
import tech.ydb.core.grpc.GrpcTransport;
Expand All @@ -39,23 +39,21 @@ public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements
private final GrpcStatusHandler statusConsumer;
private final Metadata headers;
private final AuthCallOptions callOptions;
private final GrpcFlowControl.Call flow;

private final CompletableFuture<Status> statusFuture = new CompletableFuture<>();
private final AtomicReference<Observer<R>> observerReference = new AtomicReference<>();
private final Queue<W> messagesQueue = new ArrayDeque<>();

public ReadWriteStreamCall(
String traceId,
ClientCall<W, R> call,
Metadata headers,
AuthCallOptions callOptions,
GrpcStatusHandler statusConsumer
) {
private final CompletableFuture<Status> statusFuture = new CompletableFuture<>();
private Observer<R> consumer = null;

public ReadWriteStreamCall(String traceId, ClientCall<W, R> call, GrpcFlowControl flowCtrl,
Metadata headers, AuthCallOptions options, GrpcStatusHandler statusHandler) {
this.traceId = traceId;
this.call = call;
this.headers = headers;
this.statusConsumer = statusConsumer;
this.callOptions = callOptions;
this.statusConsumer = statusHandler;
this.callOptions = options;
this.flow = flowCtrl.newCall(this::nextRequest);
}

@Override
Expand All @@ -65,14 +63,18 @@ public String authToken() {

@Override
public CompletableFuture<Status> start(Observer<R> observer) {
if (!observerReference.compareAndSet(null, observer)) {
throw new IllegalStateException("Read stream call is already started");
}

callLock.lock();
try {
if (consumer != null) {
throw new IllegalStateException("Read write stream call is already started");
}
if (observer == null) {
throw new IllegalArgumentException("Observer must be not empty");
}
consumer = observer;
call.start(this, headers);
call.request(1);
// init flow control
flow.onStart();
} catch (Throwable t) {
try {
call.cancel(null, t);
Expand Down Expand Up @@ -133,24 +135,27 @@ public void cancel() {
}
}

private void nextRequest(int count) {
// request delivery of the next inbound message.
callLock.lock();
try {
call.request(count);
} finally {
callLock.unlock();
}
}

@Override
public void onMessage(R message) {
try {
if (logger.isTraceEnabled()) {
logger.trace("ReadWriteStreamCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) message));
}

observerReference.get().onNext(message);
// request delivery of the next inbound message.
callLock.lock();
try {
call.request(1);
} finally {
callLock.unlock();
}
consumer.onNext(message);
flow.onMessageRead();
} catch (Exception ex) {
statusFuture.completeExceptionally(ex);

try {
callLock.lock();
try {
Expand Down
Loading