Skip to content

Commit 6d85324

Browse files
25-1: Fix readrows RPC (#17424)
Co-authored-by: AlexSm <alex@ydb.tech>
1 parent 808f447 commit 6d85324

File tree

1 file changed

+37
-15
lines changed

1 file changed

+37
-15
lines changed

ydb/core/grpc_services/rpc_read_rows.cpp

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ struct RequestedKeyColumn {
3737
Ydb::Type Type;
3838
};
3939

40+
struct TShardReadState {
41+
std::vector<TOwnedCellVec> Keys;
42+
ui32 FirstUnprocessedQuery = 0;
43+
};
44+
4045
}
4146

4247
namespace {
@@ -445,7 +450,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
445450
return (cmp < 0);
446451
});
447452
Y_ABORT_UNLESS(it != partitions.end());
448-
ShardIdToKeys[it->ShardId].emplace_back(std::move(key));
453+
ShardIdToReadState[it->ShardId].Keys.emplace_back(std::move(key));
449454
}
450455
}
451456

@@ -462,12 +467,13 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
462467
auto keyRange = resolvePartitionsResult->ResultSet[0].KeyDescription.Get();
463468

464469
CreateShardToKeysMapping(keyRange);
465-
for (const auto& [shardId, keys] : ShardIdToKeys) {
466-
SendRead(shardId, keys);
470+
for (const auto& [shardId, state] : ShardIdToReadState) {
471+
SendRead(shardId, state);
467472
}
468473
}
469474

470-
void SendRead(ui64 shardId, const std::vector<TOwnedCellVec>& keys) {
475+
void SendRead(ui64 shardId, const TShardReadState& readState) {
476+
auto& keys = readState.Keys;
471477
auto request = std::make_unique<TEvDataShard::TEvRead>();
472478
auto& record = request->Record;
473479

@@ -482,8 +488,8 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
482488

483489
record.SetResultFormat(::NKikimrDataEvents::FORMAT_CELLVEC);
484490

485-
for (auto& key : keys) {
486-
request->Keys.emplace_back(TSerializedCellVec::Serialize(key));
491+
for (size_t i = readState.FirstUnprocessedQuery; i < keys.size(); ++i) {
492+
request->Keys.emplace_back(TSerializedCellVec::Serialize(keys[i]));
487493
}
488494

489495
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC send TEvRead shardId : " << shardId << " keys.size(): " << keys.size());
@@ -500,7 +506,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
500506
// ReadRows can reply with the following statuses:
501507
// * SUCCESS
502508
// * INTERNAL_ERROR -- only if MaxRetries is reached
503-
// * OVERLOADED -- client will retrie it with backoff
509+
// * OVERLOADED -- client will retry it with backoff
504510
// * ABORTED -- code is used for all other DataShard errors
505511

506512
const auto& status = msg->Record.GetStatus();
@@ -509,17 +515,20 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
509515

510516
ui64 shardId = msg->Record.GetReadId();
511517

518+
auto it = ShardIdToReadState.find(shardId);
519+
if (it == ShardIdToReadState.end()) {
520+
TStringStream ss;
521+
ss << "Got unknown shardId from TEvReadResult# " << shardId << ", status# " << statusCode;
522+
ReplyWithError(statusCode, ss.Str(), &issues);
523+
return;
524+
}
525+
512526
switch (statusCode) {
513527
case Ydb::StatusIds::SUCCESS:
514528
break;
515529
case Ydb::StatusIds::INTERNAL_ERROR: {
516-
auto it = ShardIdToKeys.find(shardId);
517530
++Retries;
518-
if (it == ShardIdToKeys.end()) {
519-
TStringStream ss;
520-
ss << "Got unknown shardId from TEvReadResult# " << shardId << ", status# " << statusCode;
521-
ReplyWithError(statusCode, ss.Str(), &issues);
522-
} else if (Retries < MaxTotalRetries) {
531+
if (Retries < MaxTotalRetries) {
523532
TStringStream ss;
524533
ss << "Reached MaxRetries count for DataShard# " << shardId << ", status# " << statusCode;
525534
ReplyWithError(statusCode, ss.Str(), &issues);
@@ -540,8 +549,21 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
540549
return;
541550
}
542551
}
552+
if (!msg->Record.HasFinished() || !msg->Record.GetFinished()) {
553+
// We should have received continuation token if read is not finished.
554+
TMaybe<TString> continuationToken = msg->Record.GetContinuationToken();
555+
556+
Y_ABORT_UNLESS(continuationToken);
557+
558+
NKikimrTxDataShard::TReadContinuationToken token;
559+
Y_ABORT_UNLESS(token.ParseFromString(*continuationToken), "Failed to parse continuation token");
560+
561+
// Save continuation token in case we will have to retry on error, but for now
562+
// we just wait for the next batch of results.
563+
it->second.FirstUnprocessedQuery = token.GetFirstUnprocessedQuery();
564+
ReadsInFlight++;
565+
}
543566
}
544-
Y_ABORT_UNLESS(msg->Record.HasFinished() && msg->Record.GetFinished());
545567
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC TEvReadResult RowsCount: " << msg->GetRowsCount());
546568

547569
EvReadResults.emplace_back(ev->Release().Release());
@@ -761,7 +783,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
761783
};
762784
TVector<TColumnMeta> RequestedColumnsMeta;
763785

764-
std::map<ui64, std::vector<TOwnedCellVec>> ShardIdToKeys;
786+
std::map<ui64, TShardReadState> ShardIdToReadState;
765787
std::vector<std::unique_ptr<TEvDataShard::TEvReadResult>> EvReadResults;
766788
// TEvRead interface
767789
ui64 ReadsInFlight = 0;

0 commit comments

Comments
 (0)