Skip to content

Commit 7a54545

Browse files
authored
24-2: Fix unexpected read iterator stream reset (#7709)
1 parent 5d0978a commit 7a54545

File tree

4 files changed

+209
-13
lines changed

4 files changed

+209
-13
lines changed

ydb/core/tx/datashard/datashard__read_iterator.cpp

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "datashard_locks_db.h"
66
#include "probes.h"
77

8+
#include <ydb/core/base/counters.h>
89
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
910

1011
#include <ydb/library/actors/core/monotonic_provider.h>
@@ -315,6 +316,8 @@ class TReader {
315316
, Self(self)
316317
, TableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion)
317318
, FirstUnprocessedQuery(State.FirstUnprocessedQuery)
319+
, LastProcessedKey(State.LastProcessedKey)
320+
, LastProcessedKeyErased(State.LastProcessedKeyErased)
318321
{
319322
GetTimeFast(&StartTime);
320323
EndTime = StartTime;
@@ -329,10 +332,10 @@ class TReader {
329332
bool toInclusive;
330333
TSerializedCellVec keyFromCells;
331334
TSerializedCellVec keyToCells;
332-
if (Y_UNLIKELY(FirstUnprocessedQuery == State.FirstUnprocessedQuery && State.LastProcessedKey)) {
335+
if (LastProcessedKey) {
333336
if (!State.Reverse) {
334-
keyFromCells = TSerializedCellVec(State.LastProcessedKey);
335-
fromInclusive = State.LastProcessedKeyErased;
337+
keyFromCells = TSerializedCellVec(LastProcessedKey);
338+
fromInclusive = LastProcessedKeyErased;
336339

337340
keyToCells = range.To;
338341
toInclusive = range.ToInclusive;
@@ -341,8 +344,8 @@ class TReader {
341344
keyFromCells = range.From;
342345
fromInclusive = range.FromInclusive;
343346

344-
keyToCells = TSerializedCellVec(State.LastProcessedKey);
345-
toInclusive = State.LastProcessedKeyErased;
347+
keyToCells = TSerializedCellVec(LastProcessedKey);
348+
toInclusive = LastProcessedKeyErased;
346349
}
347350
} else {
348351
keyFromCells = range.From;
@@ -500,6 +503,7 @@ class TReader {
500503
while (FirstUnprocessedQuery < State.Request->Ranges.size()) {
501504
if (ReachedTotalRowsLimit()) {
502505
FirstUnprocessedQuery = -1;
506+
LastProcessedKey.clear();
503507
return true;
504508
}
505509

@@ -526,6 +530,7 @@ class TReader {
526530
FirstUnprocessedQuery++;
527531
else
528532
FirstUnprocessedQuery--;
533+
LastProcessedKey.clear();
529534
}
530535

531536
return true;
@@ -537,6 +542,7 @@ class TReader {
537542
while (FirstUnprocessedQuery < State.Request->Keys.size()) {
538543
if (ReachedTotalRowsLimit()) {
539544
FirstUnprocessedQuery = -1;
545+
LastProcessedKey.clear();
540546
return true;
541547
}
542548

@@ -562,6 +568,7 @@ class TReader {
562568
FirstUnprocessedQuery++;
563569
else
564570
FirstUnprocessedQuery--;
571+
LastProcessedKey.clear();
565572
}
566573

567574
return true;
@@ -727,6 +734,28 @@ class TReader {
727734
}
728735

729736
void UpdateState(TReadIteratorState& state, bool sentResult) {
737+
if (state.FirstUnprocessedQuery == FirstUnprocessedQuery &&
738+
state.LastProcessedKey && !LastProcessedKey)
739+
{
740+
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
741+
"DataShard " << Self->TabletID() << " detected unexpected reset of LastProcessedKey:"
742+
<< " ReadId# " << State.ReadId
743+
<< " LastSeqNo# " << State.SeqNo
744+
<< " LastQuery# " << State.FirstUnprocessedQuery
745+
<< " RowsRead# " << RowsRead
746+
<< " RowsProcessed# " << RowsProcessed
747+
<< " RowsSinceLastCheck# " << RowsSinceLastCheck
748+
<< " BytesInResult# " << BytesInResult
749+
<< " DeletedRowSkips# " << DeletedRowSkips
750+
<< " InvisibleRowSkips# " << InvisibleRowSkips
751+
<< " Quota.Rows# " << State.Quota.Rows
752+
<< " Quota.Bytes# " << State.Quota.Bytes
753+
<< " State.TotalRows# " << State.TotalRows
754+
<< " State.TotalRowsLimit# " << State.TotalRowsLimit
755+
<< " State.MaxRowsInResult# " << State.MaxRowsInResult);
756+
Self->IncCounterReadIteratorLastKeyReset();
757+
}
758+
730759
state.TotalRows += RowsRead;
731760
state.FirstUnprocessedQuery = FirstUnprocessedQuery;
732761
state.LastProcessedKey = LastProcessedKey;
@@ -1632,6 +1661,7 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation {
16321661
if (Reader->HasUnreadQueries()) {
16331662
Reader->UpdateState(state, ResultSent);
16341663
if (!state.IsExhausted()) {
1664+
state.ReadContinuePending = true;
16351665
ctx.Send(
16361666
Self->SelfId(),
16371667
new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId));
@@ -2282,6 +2312,15 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
22822312
Y_ASSERT(it->second);
22832313
auto& state = *it->second;
22842314

2315+
if (state.IsExhausted()) {
2316+
// iterator quota reduced and exhausted while ReadContinue was inflight
2317+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId
2318+
<< ", quota exhausted while rescheduling");
2319+
state.ReadContinuePending = false;
2320+
Result.reset();
2321+
return true;
2322+
}
2323+
22852324
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId
22862325
<< ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery);
22872326

@@ -2394,6 +2433,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
23942433
if (Reader->Read(txc, ctx)) {
23952434
// Retry later when dependencies are resolved
23962435
if (!Reader->GetVolatileReadDependencies().empty()) {
2436+
state.ReadContinuePending = true;
23972437
Self->WaitVolatileDependenciesThenSend(
23982438
Reader->GetVolatileReadDependencies(),
23992439
Self->SelfId(),
@@ -2480,6 +2520,8 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
24802520
Y_ABORT_UNLESS(it->second);
24812521
auto& state = *it->second;
24822522

2523+
state.ReadContinuePending = false;
2524+
24832525
if (!Result) {
24842526
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId
24852527
<< " TTxReadContinue::Execute() finished without Result, aborting");
@@ -2527,14 +2569,14 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
25272569
}
25282570

25292571
if (Reader->HasUnreadQueries()) {
2530-
Y_ASSERT(it->second);
2531-
auto& state = *it->second;
2572+
bool wasExhausted = state.IsExhausted();
25322573
Reader->UpdateState(state, useful);
25332574
if (!state.IsExhausted()) {
2575+
state.ReadContinuePending = true;
25342576
ctx.Send(
25352577
Self->SelfId(),
25362578
new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId));
2537-
} else {
2579+
} else if (!wasExhausted) {
25382580
Self->IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
25392581
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
25402582
<< " read iterator# " << ReadId << " exhausted");
@@ -2803,14 +2845,19 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
28032845
bool wasExhausted = state.IsExhausted();
28042846
state.UpQuota(
28052847
record.GetSeqNo(),
2806-
record.GetMaxRows(),
2807-
record.GetMaxBytes());
2848+
record.HasMaxRows() ? record.GetMaxRows() : Max<ui64>(),
2849+
record.HasMaxBytes() ? record.GetMaxBytes() : Max<ui64>());
28082850

28092851
if (wasExhausted && !state.IsExhausted()) {
28102852
DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
2811-
ctx.Send(
2812-
SelfId(),
2813-
new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId()));
2853+
if (!state.ReadContinuePending) {
2854+
state.ReadContinuePending = true;
2855+
ctx.Send(
2856+
SelfId(),
2857+
new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId()));
2858+
}
2859+
} else if (!wasExhausted && state.IsExhausted()) {
2860+
IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
28142861
}
28152862

28162863
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck for read iterator# " << readId
@@ -2939,6 +2986,16 @@ void TDataShard::UnsubscribeReadIteratorSessions(const TActorContext& ctx) {
29392986
ReadIteratorSessions.clear();
29402987
}
29412988

2989+
void TDataShard::IncCounterReadIteratorLastKeyReset() {
2990+
if (!CounterReadIteratorLastKeyReset) {
2991+
CounterReadIteratorLastKeyReset = GetServiceCounters(AppData()->Counters, "tablets")
2992+
->GetSubgroup("type", "DataShard")
2993+
->GetSubgroup("category", "app")
2994+
->GetCounter("DataShard/ReadIteratorLastKeyReset", true);
2995+
}
2996+
++*CounterReadIteratorLastKeyReset;
2997+
}
2998+
29422999
} // NKikimr::NDataShard
29433000

29443001
template<>

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3273,6 +3273,10 @@ class TDataShard
32733273
bool AllowCancelROwithReadsets() const;
32743274

32753275
void ResolveTablePath(const TActorContext &ctx);
3276+
3277+
public:
3278+
NMonitoring::TDynamicCounters::TCounterPtr CounterReadIteratorLastKeyReset;
3279+
void IncCounterReadIteratorLastKeyReset();
32763280
};
32773281

32783282
NKikimrTxDataShard::TError::EKind ConvertErrCode(NMiniKQL::IEngineFlat::EResult code);

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4164,6 +4164,140 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {
41644164
"result2: " << result2);
41654165
}
41664166

4167+
template<class TEvType>
4168+
class TBlockEvents : public std::deque<typename TEvType::TPtr> {
4169+
public:
4170+
TBlockEvents(TTestActorRuntime& runtime, std::function<bool(typename TEvType::TPtr&)> condition = {})
4171+
: Runtime(runtime)
4172+
, Condition(std::move(condition))
4173+
, Holder(Runtime.AddObserver<TEvType>(
4174+
[this](typename TEvType::TPtr& ev) {
4175+
this->Process(ev);
4176+
}))
4177+
{}
4178+
4179+
TBlockEvents& Unblock(size_t count = -1) {
4180+
while (!this->empty() && count > 0) {
4181+
auto& ev = this->front();
4182+
IEventHandle* ptr = ev.Get();
4183+
UnblockedOnce.insert(ptr);
4184+
Runtime.Send(ev.Release(), 0, /* viaActorSystem */ true);
4185+
this->pop_front();
4186+
--count;
4187+
}
4188+
return *this;
4189+
}
4190+
4191+
void Stop() {
4192+
UnblockedOnce.clear();
4193+
Holder.Remove();
4194+
}
4195+
4196+
private:
4197+
void Process(typename TEvType::TPtr& ev) {
4198+
IEventHandle* ptr = ev.Get();
4199+
auto it = UnblockedOnce.find(ptr);
4200+
if (it != UnblockedOnce.end()) {
4201+
UnblockedOnce.erase(it);
4202+
return;
4203+
}
4204+
4205+
if (Condition && !Condition(ev)) {
4206+
return;
4207+
}
4208+
4209+
this->emplace_back(std::move(ev));
4210+
}
4211+
4212+
private:
4213+
TTestActorRuntime& Runtime;
4214+
std::function<bool(typename TEvType::TPtr&)> Condition;
4215+
TTestActorRuntime::TEventObserverHolder Holder;
4216+
THashSet<IEventHandle*> UnblockedOnce;
4217+
};
4218+
4219+
Y_UNIT_TEST(Bug_7674_IteratorDuplicateRows) {
4220+
TPortManager pm;
4221+
TServerSettings serverSettings(pm.GetPort(2134));
4222+
serverSettings.SetDomainName("Root")
4223+
.SetUseRealThreads(false);
4224+
TServer::TPtr server = new TServer(serverSettings);
4225+
4226+
auto& runtime = *server->GetRuntime();
4227+
auto sender = runtime.AllocateEdgeActor();
4228+
4229+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
4230+
4231+
InitRoot(server, sender);
4232+
4233+
TDisableDataShardLogBatching disableDataShardLogBatching;
4234+
4235+
CreateShardedTable(server, sender, "/Root", "table-1", 1);
4236+
4237+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);");
4238+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (6, 60), (7, 70), (8, 80), (9, 90), (10, 100);");
4239+
runtime.SimulateSleep(TDuration::Seconds(1));
4240+
4241+
auto forceSmallChunks = runtime.AddObserver<TEvDataShard::TEvRead>(
4242+
[&](TEvDataShard::TEvRead::TPtr& ev) {
4243+
auto* msg = ev->Get();
4244+
// Force chunks of at most 3 rows
4245+
msg->Record.SetMaxRowsInResult(3);
4246+
});
4247+
4248+
TBlockEvents<TEvDataShard::TEvReadAck> blockedAcks(runtime);
4249+
TBlockEvents<TEvDataShard::TEvReadResult> blockedResults(runtime);
4250+
TBlockEvents<TEvDataShard::TEvReadContinue> blockedContinue(runtime);
4251+
4252+
auto waitFor = [&](const TString& description, const auto& condition, size_t count = 1) {
4253+
while (!condition()) {
4254+
UNIT_ASSERT_C(count > 0, "... failed to wait for " << description);
4255+
Cerr << "... waiting for " << description << Endl;
4256+
TDispatchOptions options;
4257+
options.CustomFinalCondition = [&]() {
4258+
return condition();
4259+
};
4260+
runtime.DispatchEvents(options);
4261+
--count;
4262+
}
4263+
};
4264+
4265+
auto readFuture = KqpSimpleSend(runtime, "SELECT key, value FROM `/Root/table-1` ORDER BY key LIMIT 7");
4266+
waitFor("first TEvReadContinue", [&]{ return blockedContinue.size() >= 1; });
4267+
waitFor("first TEvReadResult", [&]{ return blockedResults.size() >= 1; });
4268+
4269+
blockedContinue.Unblock(1);
4270+
waitFor("second TEvReadContinue", [&]{ return blockedContinue.size() >= 1; });
4271+
waitFor("second TEvReadResult", [&]{ return blockedResults.size() >= 2; });
4272+
4273+
// We need both results to arrive without pauses
4274+
blockedResults.Unblock();
4275+
4276+
waitFor("both TEvReadAcks", [&]{ return blockedAcks.size() >= 2; });
4277+
4278+
// Unblock the first TEvReadAck and then pending TEvReadContinue
4279+
blockedAcks.Unblock(1);
4280+
blockedContinue.Unblock(1);
4281+
4282+
// Give it some time to trigger the bug
4283+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
4284+
4285+
// Stop blocking everything
4286+
blockedAcks.Unblock().Stop();
4287+
blockedResults.Unblock().Stop();
4288+
blockedContinue.Unblock().Stop();
4289+
4290+
UNIT_ASSERT_VALUES_EQUAL(
4291+
FormatResult(AwaitResponse(runtime, std::move(readFuture))),
4292+
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
4293+
"{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
4294+
"{ items { uint32_value: 3 } items { uint32_value: 30 } }, "
4295+
"{ items { uint32_value: 4 } items { uint32_value: 40 } }, "
4296+
"{ items { uint32_value: 5 } items { uint32_value: 50 } }, "
4297+
"{ items { uint32_value: 6 } items { uint32_value: 60 } }, "
4298+
"{ items { uint32_value: 7 } items { uint32_value: 70 } }");
4299+
}
4300+
41674301
}
41684302

41694303
} // namespace NKikimr

ydb/core/tx/datashard/read_iterator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ struct TReadIteratorState {
205205
TActorId SessionId;
206206
TMonotonic StartTs;
207207
bool IsFinished = false;
208+
bool ReadContinuePending = false;
208209

209210
// note that we send SeqNo's starting from 1
210211
ui64 SeqNo = 0;

0 commit comments

Comments
 (0)