Skip to content

Commit 8c1f77a

Browse files
authored
[core] Use async next for grpc server (#51378)
It's recommended to use AsyncNext if you're bringing your own threading model, which we are here by calling poll from our polling threads. Otherwise Next will use its own internal threading model. --------- Signed-off-by: dayshah <dhyey2019@gmail.com>
1 parent ae15926 commit 8c1f77a

File tree

3 files changed

+19
-4
lines changed

3 files changed

+19
-4
lines changed

src/ray/rpc/client_call.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ class ClientCallManager {
308308
} else if (status != grpc::CompletionQueue::TIMEOUT) {
309309
// NOTE: CompletionQueue::TIMEOUT and gRPC deadline exceeded are different.
310310
// If the client deadline is exceeded, event is obtained at this block.
311-
auto tag = reinterpret_cast<ClientCallTag *>(got_tag);
311+
auto tag = static_cast<ClientCallTag *>(got_tag);
312312
// Refresh the tag.
313313
got_tag = nullptr;
314314
tag->GetCall()->SetReturnStatus();

src/ray/rpc/grpc_server.cc

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ void GrpcServer::Init() {
4343

4444
void GrpcServer::Shutdown() {
4545
if (!is_closed_) {
46+
shutdown_ = true;
4647
// Drain the executor threads.
4748
// Shutdown the server with an immediate deadline.
4849
// TODO(edoakes): do we want to do this in all cases?
@@ -189,7 +190,19 @@ void GrpcServer::PollEventsFromCompletionQueue(int index) {
189190
bool ok;
190191

191192
// Keep reading events from the `CompletionQueue` until it's shutdown.
192-
while (cqs_[index]->Next(&tag, &ok)) {
193+
while (true) {
194+
auto deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
195+
gpr_time_from_millis(250, GPR_TIMESPAN));
196+
auto status = cqs_[index]->AsyncNext(&tag, &ok, deadline);
197+
if (status == grpc::CompletionQueue::SHUTDOWN ||
198+
(status == grpc::CompletionQueue::TIMEOUT && shutdown_)) {
199+
// If we timed out and shutdown, then exit immediately. This should not
200+
// be needed, but gRPC seems to not return SHUTDOWN correctly in these
201+
// cases (e.g., test_wait will hang on shutdown without this check).
202+
break;
203+
} else if (status == grpc::CompletionQueue::TIMEOUT) {
204+
continue;
205+
}
193206
auto *server_call = static_cast<ServerCall *>(tag);
194207
bool delete_call = false;
195208
// A new call is needed after the server sends a reply, no matter the reply is
@@ -241,6 +254,5 @@ void GrpcServer::PollEventsFromCompletionQueue(int index) {
241254
}
242255
}
243256
}
244-
245257
} // namespace rpc
246258
} // namespace ray

src/ray/rpc/grpc_server.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ class GrpcServer {
105105
cluster_id_(ClusterID::Nil()),
106106
is_closed_(true),
107107
num_threads_(num_threads),
108-
keepalive_time_ms_(keepalive_time_ms) {
108+
keepalive_time_ms_(keepalive_time_ms),
109+
shutdown_(false) {
109110
Init();
110111
}
111112

@@ -182,6 +183,8 @@ class GrpcServer {
182183
/// gRPC server cannot get the ping response within the time, it triggers
183184
/// the watchdog timer fired error, which will close the connection.
184185
const int64_t keepalive_time_ms_;
186+
187+
std::atomic_bool shutdown_;
185188
};
186189

187190
/// Base class that represents an abstract gRPC service.

0 commit comments

Comments
 (0)