Skip to content

Commit d03dd3b

Browse files
committed
fix stream lookup square and add simple overload checker (#18922)
1 parent 9888760 commit d03dd3b

File tree

3 files changed

+105
-55
lines changed

3 files changed

+105
-55
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 87 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
9999
}
100100
}
101101

102+
auto affectedShards = Reads.AffectedShards();
102103
// TODO: use evread statistics after KIKIMR-16924
103104
tableStats->SetReadRows(tableStats->GetReadRows() + rowsReadEstimate);
104105
tableStats->SetReadBytes(tableStats->GetReadBytes() + bytesReadEstimate);
105-
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size());
106+
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + affectedShards.size());
106107

107108
NKqpProto::TKqpTableExtraStats tableExtraStats;
108109
auto readActorTableAggrExtraStats = tableExtraStats.MutableReadActorTableAggrExtraStats();
109-
for (const auto& [shardId, _] : ReadsPerShard) {
110+
for (const auto& shardId : affectedShards) {
110111
readActorTableAggrExtraStats->AddAffectedShards(shardId);
111112
}
112113

@@ -137,10 +138,6 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
137138
, ShardId(shardId)
138139
, State(EReadState::Initial) {}
139140

140-
void SetFinished() {
141-
State = EReadState::Finished;
142-
}
143-
144141
bool Finished() const {
145142
return (State == EReadState::Finished);
146143
}
@@ -159,7 +156,68 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
159156

160157
struct TShardState {
161158
ui64 RetryAttempts = 0;
162-
std::vector<TReadState*> Reads;
159+
std::unordered_set<ui64> Reads;
160+
};
161+
162+
struct TReads {
163+
std::unordered_map<ui64, TReadState> Reads;
164+
std::unordered_map<ui64, TShardState> ReadsPerShard;
165+
166+
std::unordered_map<ui64, TReadState>::iterator begin() { return Reads.begin(); }
167+
168+
std::unordered_map<ui64, TReadState>::iterator end() { return Reads.end(); }
169+
170+
std::unordered_map<ui64, TReadState>::iterator find(ui64 readId) {
171+
return Reads.find(readId);
172+
}
173+
174+
void insert(TReadState&& read) {
175+
const auto [readIt, succeeded] = Reads.insert({read.Id, std::move(read)});
176+
YQL_ENSURE(succeeded);
177+
ReadsPerShard[readIt->second.ShardId].Reads.emplace(readIt->second.Id);
178+
}
179+
180+
size_t InFlightReads() const {
181+
return Reads.size();
182+
}
183+
184+
std::vector<ui64> AffectedShards() const {
185+
std::vector<ui64> result;
186+
result.reserve(ReadsPerShard.size());
187+
for(const auto& [shard, _]: ReadsPerShard) {
188+
result.push_back(shard);
189+
}
190+
return result;
191+
}
192+
193+
bool CheckShardRetriesExeeded(TReadState& failedRead) {
194+
const auto& shardState = ReadsPerShard[failedRead.ShardId];
195+
return shardState.RetryAttempts + 1 > MaxShardRetries();
196+
}
197+
198+
TDuration CalcDelayForShard(TReadState& failedRead, bool allowInstantRetry) {
199+
auto& shardState = ReadsPerShard[failedRead.ShardId];
200+
++shardState.RetryAttempts;
201+
return CalcDelay(shardState.RetryAttempts, allowInstantRetry);
202+
}
203+
204+
void erase(TReadState& read) {
205+
ReadsPerShard[read.ShardId].Reads.erase(read.Id);
206+
Reads.erase(read.Id);
207+
}
208+
209+
std::vector<TReadState*> GetShardReads(ui64 shardId) {
210+
auto it = ReadsPerShard.find(shardId);
211+
YQL_ENSURE(it != ReadsPerShard.end());
212+
std::vector<TReadState*> result;
213+
for(ui64 readId: it->second.Reads) {
214+
auto it = Reads.find(readId);
215+
YQL_ENSURE(it != Reads.end());
216+
result.push_back(&it->second);
217+
}
218+
219+
return result;
220+
}
163221
};
164222

165223
struct TEvPrivate {
@@ -224,13 +282,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
224282
ReadRowsCount += replyResultStats.ReadRowsCount;
225283
ReadBytesCount += replyResultStats.ReadBytesCount;
226284

227-
auto status = FetchInputRows();
285+
if (!StreamLookupWorker->IsOverloaded()) {
286+
FetchInputRows();
287+
}
228288

229289
if (Partitioning) {
230290
ProcessInputRows();
231291
}
232292

233-
const bool inputRowsFinished = status == NUdf::EFetchStatus::Finish;
293+
const bool inputRowsFinished = LastFetchStatus == NUdf::EFetchStatus::Finish;
234294
const bool allReadsFinished = AllReadsFinished();
235295
const bool allRowsProcessed = StreamLookupWorker->AllRowsProcessed();
236296

@@ -305,14 +365,14 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
305365
void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
306366
const auto& record = ev->Get()->Record;
307367

308-
309368
auto readIt = Reads.find(record.GetReadId());
310369
if (readIt == Reads.end() || readIt->second.State != EReadState::Running) {
311370
CA_LOG_D("Drop read with readId: " << record.GetReadId() << ", because it's already completed or blocked");
312371
return;
313372
}
314373

315374
auto& read = readIt->second;
375+
ui64 shardId = read.ShardId;
316376

317377
CA_LOG_D("Recv TEvReadResult (stream lookup) from ShardID=" << read.ShardId
318378
<< ", Table = " << StreamLookupWorker->GetTablePath()
@@ -369,13 +429,13 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
369429
case Ydb::StatusIds::NOT_FOUND:
370430
{
371431
StreamLookupWorker->ResetRowsProcessing(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey);
372-
read.SetFinished();
373432
CA_LOG_D("NOT_FOUND was received from tablet: " << read.ShardId << ". "
374433
<< getIssues().ToOneLineString());
434+
Reads.erase(read);
375435
return ResolveTableShards();
376436
}
377437
case Ydb::StatusIds::OVERLOADED: {
378-
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
438+
if (CheckTotalRetriesExeeded() || Reads.CheckShardRetriesExeeded(read)) {
379439
return replyError(
380440
TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
381441
NYql::NDqProto::StatusIds::OVERLOADED);
@@ -386,7 +446,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
386446
return RetryTableRead(read, /*allowInstantRetry = */false);
387447
}
388448
case Ydb::StatusIds::INTERNAL_ERROR: {
389-
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
449+
if (CheckTotalRetriesExeeded() || Reads.CheckShardRetriesExeeded(read)) {
390450
return replyError(
391451
TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
392452
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
@@ -405,7 +465,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
405465
read.LastSeqNo = record.GetSeqNo();
406466

407467
if (record.GetFinished()) {
408-
read.SetFinished();
468+
Reads.erase(read);
409469
} else {
410470
YQL_ENSURE(record.HasContinuationToken(), "Successful TEvReadResult should contain continuation token");
411471
NKikimrTxDataShard::TReadContinuationToken continuationToken;
@@ -443,7 +503,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
443503

444504
auto guard = BindAllocator();
445505
StreamLookupWorker->AddResult(TKqpStreamLookupWorker::TShardReadResult{
446-
read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())
506+
shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())
447507
});
448508
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
449509
}
@@ -452,11 +512,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
452512
CA_LOG_D("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId);
453513

454514
const auto& tabletId = ev->Get()->TabletId;
455-
auto shardIt = ReadsPerShard.find(tabletId);
456-
YQL_ENSURE(shardIt != ReadsPerShard.end());
457515

458516
TVector<TReadState*> toRetry;
459-
for (auto* read : shardIt->second.Reads) {
517+
for (auto* read : Reads.GetShardReads(tabletId)) {
460518
if (read->State == EReadState::Running) {
461519
Counters->IteratorDeliveryProblems->Inc();
462520
toRetry.push_back(read);
@@ -489,27 +547,24 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
489547

490548
if ((read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) || read.State == EReadState::Blocked) {
491549
if (ev->Get()->InstantStart) {
492-
read.SetFinished();
493550
auto requests = StreamLookupWorker->RebuildRequest(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey, ReadId);
494551
for (auto& request : requests) {
495552
StartTableRead(read.ShardId, std::move(request));
496553
}
554+
Reads.erase(read);
497555
} else {
498556
RetryTableRead(read);
499557
}
500558
}
501559
}
502560

503-
NUdf::EFetchStatus FetchInputRows() {
561+
void FetchInputRows() {
504562
auto guard = BindAllocator();
505563

506-
NUdf::EFetchStatus status;
507564
NUdf::TUnboxedValue row;
508-
while ((status = Input.Fetch(row)) == NUdf::EFetchStatus::Ok) {
565+
while ((LastFetchStatus = Input.Fetch(row)) == NUdf::EFetchStatus::Ok) {
509566
StreamLookupWorker->AddInputRow(std::move(row));
510567
}
511-
512-
return status;
513568
}
514569

515570
void ProcessInputRows() {
@@ -569,9 +624,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
569624

570625
auto readId = read.Id;
571626
auto lastSeqNo = read.LastSeqNo;
572-
const auto [readIt, succeeded] = Reads.insert({readId, std::move(read)});
573-
YQL_ENSURE(succeeded);
574-
ReadsPerShard[shardId].Reads.push_back(&readIt->second);
627+
Reads.insert(std::move(read));
575628

576629
if (auto delay = ShardTimeout()) {
577630
TlsActivationContext->Schedule(
@@ -585,11 +638,6 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
585638
return limit && TotalRetryAttempts + 1 > *limit;
586639
}
587640

588-
bool CheckShardRetriesExeeded(TReadState& failedRead) {
589-
const auto& shardState = ReadsPerShard[failedRead.ShardId];
590-
return shardState.RetryAttempts + 1 > MaxShardRetries();
591-
}
592-
593641
void RetryTableRead(TReadState& failedRead, bool allowInstantRetry = true) {
594642
CA_LOG_D("Retry reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << failedRead.Id
595643
<< ", shardId: " << failedRead.ShardId);
@@ -600,21 +648,19 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
600648
}
601649
++TotalRetryAttempts;
602650

603-
if (CheckShardRetriesExeeded(failedRead)) {
651+
if (Reads.CheckShardRetriesExeeded(failedRead)) {
604652
StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey);
605-
failedRead.SetFinished();
653+
Reads.erase(failedRead);
606654
return ResolveTableShards();
607655
}
608-
auto& shardState = ReadsPerShard[failedRead.ShardId];
609-
++shardState.RetryAttempts;
610656

611-
auto delay = CalcDelay(shardState.RetryAttempts, allowInstantRetry);
657+
auto delay = Reads.CalcDelayForShard(failedRead, allowInstantRetry);
612658
if (delay == TDuration::Zero()) {
613-
failedRead.SetFinished();
614659
auto requests = StreamLookupWorker->RebuildRequest(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey, ReadId);
615660
for (auto& request : requests) {
616661
StartTableRead(failedRead.ShardId, std::move(request));
617662
}
663+
Reads.erase(failedRead);
618664
} else {
619665
CA_LOG_D("Schedule retry atempt for readId: " << failedRead.Id << " after " << delay);
620666
TlsActivationContext->Schedule(
@@ -660,13 +706,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
660706
}
661707

662708
bool AllReadsFinished() const {
663-
for (const auto& [_, read] : Reads) {
664-
if (!read.Finished()) {
665-
return false;
666-
}
667-
}
668-
669-
return true;
709+
return Reads.InFlightReads() == 0;
670710
}
671711

672712
TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() {
@@ -702,8 +742,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
702742
const TMaybe<ui64> LockTxId;
703743
const TMaybe<ui32> NodeLockId;
704744
const TMaybe<NKikimrDataEvents::ELockMode> LockMode;
705-
std::unordered_map<ui64, TReadState> Reads;
706-
std::unordered_map<ui64, TShardState> ReadsPerShard;
745+
TReads Reads;
746+
NUdf::EFetchStatus LastFetchStatus = NUdf::EFetchStatus::Yield;
707747
std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
708748
const TDuration SchemeCacheRequestTimeout;
709749
NActors::TActorId SchemeCacheRequestTimeoutTimer;

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
namespace NKikimr {
1414
namespace NKqp {
1515

16+
constexpr ui64 MAX_IN_FLIGHT_LIMIT = 500;
17+
1618
namespace {
1719
std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpStreamLookupWorker::TPartitionInfo& partitionInfo,
1820
const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TOwnedTableRange& range) {
@@ -274,12 +276,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
274276
ReadResults.emplace_back(std::move(result));
275277
}
276278

279+
bool IsOverloaded() final {
280+
return false;
281+
}
282+
277283
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
278284
TReadResultStats resultStats;
279-
bool sizeLimitExceeded = false;
280285
batch.clear();
281286

282-
while (!ReadResults.empty() && !sizeLimitExceeded) {
287+
while (!ReadResults.empty() && !resultStats.SizeLimitExceeded) {
283288
auto& result = ReadResults.front();
284289
for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
285290
const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
@@ -305,10 +310,10 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
305310
}
306311

307312
if (rowSize + (i64)resultStats.ResultBytesCount > freeSpace) {
308-
sizeLimitExceeded = true;
313+
resultStats.SizeLimitExceeded = true;
309314
}
310315

311-
if (resultStats.ResultRowsCount && sizeLimitExceeded) {
316+
if (resultStats.ResultRowsCount && resultStats.SizeLimitExceeded) {
312317
row.DeleteUnreferenced();
313318
break;
314319
}
@@ -444,6 +449,10 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
444449
UnprocessedRows.emplace_back(std::make_pair(TOwnedCellVec(joinKeyCells), std::move(inputRow.GetElement(1))));
445450
}
446451

452+
bool IsOverloaded() final {
453+
return UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT || PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT || ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT;
454+
}
455+
447456
std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
448457
TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) final {
449458

@@ -718,7 +727,6 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
718727

719728
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
720729
TReadResultStats resultStats;
721-
bool sizeLimitExceeded = false;
722730
batch.clear();
723731

724732
// we should process left rows that haven't matches on the right
@@ -747,7 +755,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
747755
return ResultRowsBySeqNo.find(CurrentResultSeqNo);
748756
};
749757

750-
while (!sizeLimitExceeded) {
758+
while (!resultStats.SizeLimitExceeded) {
751759
auto resultIt = getNextResult();
752760
if (resultIt == ResultRowsBySeqNo.end()) {
753761
break;
@@ -758,7 +766,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
758766
auto& row = result.Rows[result.FirstUnprocessedRow];
759767

760768
if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
761-
sizeLimitExceeded = true;
769+
resultStats.SizeLimitExceeded = true;
762770
break;
763771
}
764772

ydb/core/kqp/runtime/kqp_stream_lookup_worker.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class TKqpStreamLookupWorker {
2727
ui64 ReadBytesCount = 0;
2828
ui64 ResultRowsCount = 0;
2929
ui64 ResultBytesCount = 0;
30+
bool SizeLimitExceeded = false;
3031

3132
void Add(const TReadResultStats& other) {
3233
ReadRowsCount += other.ReadRowsCount;
@@ -54,13 +55,14 @@ class TKqpStreamLookupWorker {
5455
virtual std::vector<NScheme::TTypeInfo> GetKeyColumnTypes() const;
5556

5657
virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0;
57-
virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
58+
virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
5859
TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) = 0;
5960
virtual TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) = 0;
6061
virtual void AddResult(TShardReadResult result) = 0;
6162
virtual TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) = 0;
6263
virtual bool AllRowsProcessed() = 0;
6364
virtual void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) = 0;
65+
virtual bool IsOverloaded() = 0;
6466

6567
protected:
6668
const NKikimrKqp::TKqpStreamLookupSettings Settings;

0 commit comments

Comments
 (0)