Skip to content

Commit 5314402

Browse files
ivanmorozov333ivanmorozov333
andauthored
fix cursors usage (#20394)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
1 parent e9dbbe4 commit 5314402

File tree

11 files changed

+51
-21
lines changed

11 files changed

+51
-21
lines changed

ydb/core/kqp/compute_actor/kqp_compute_state.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ struct TShardState: public TCommonRetriesState {
4141
std::optional<NKikimrKqp::TEvKqpScanCursor> LastCursorProto;
4242
std::optional<ui32> AvailablePacks;
4343

44+
TString CursorDebugString() const {
45+
TString strCursor = LastCursorProto ? LastCursorProto->DebugString() : TString("START");
46+
while (strCursor.find("\n") != std::string::npos) {
47+
strCursor.replace(strCursor.find("\n"), 1, " ");
48+
}
49+
return strCursor;
50+
}
4451
TString PrintLastKey(TConstArrayRef<NScheme::TTypeInfo> keyTypes) const;
4552

4653
TShardState(const ui64 tabletId);

ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ class TShardScannerInfo {
6969
auto ev = externalObjectsProvider.BuildEvKqpScan(ScanId, Generation, ranges, state.LastCursorProto);
7070

7171
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "start_scanner")("tablet_id", TabletId)("generation", Generation)
72-
("info", state.ToString(keyColumnTypes))("range", DebugPrintRanges(keyColumnTypes, ranges, *AppData()->TypeRegistry))
73-
("subscribed", subscribed);
72+
("info", state.ToString(keyColumnTypes))("range", DebugPrintRanges(keyColumnTypes, ranges, *AppData()->TypeRegistry))(
73+
"subscribed", subscribed)("cursor", state.CursorDebugString());
7474

7575
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(false),
7676
new TEvPipeCache::TEvForward(ev.release(), TabletId, !subscribed), IEventHandle::FlagTrackDelivery);

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps
4444
, ShardsScanningPolicy(shardsScanningPolicy)
4545
, Counters(counters)
4646
, InFlightShards(ScanId, *this)
47-
, InFlightComputes(ComputeActorIds) {
47+
, InFlightComputes(ComputeActorIds)
48+
, IsOlapTable(Meta.GetTable().GetTableKind() == (ui32)ETableKind::Olap) {
4849
Y_UNUSED(traceId);
4950
AFL_ENSURE(!Meta.GetReads().empty());
5051
AFL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView);
@@ -86,7 +87,8 @@ void TKqpScanFetcherActor::Bootstrap() {
8687
for (auto&& c : ComputeActorIds) {
8788
Sender<TEvScanExchange::TEvRegisterFetcher>().SendTo(c);
8889
}
89-
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "bootstrap")("compute", ComputeActorIds.size())("shards", PendingShards.size());
90+
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "bootstrap")("self_if", SelfId())("compute", ComputeActorIds.size())(
91+
"shards", PendingShards.size());
9092
StartTableScan();
9193
Become(&TKqpScanFetcherActor::StateFunc);
9294
Schedule(PING_PERIOD, new NActors::TEvents::TEvWakeup());
@@ -301,7 +303,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet
301303
}
302304

303305
const auto& tr = *AppData()->TypeRegistry;
304-
if (Meta.HasOlapProgram()) {
306+
if (IsOlapTable) {
305307
bool found = false;
306308
for (auto&& partition : keyDesc->GetPartitions()) {
307309
if (partition.ShardId != state.TabletId) {
@@ -534,8 +536,9 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData
534536
state->LastCursorProto = std::move(msg.LastCursorProto);
535537
const ui64 rowsCount = msg.GetRowsCount();
536538
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action", "got EvScanData")("rows", rowsCount)("finished", msg.Finished)(
537-
"exceeded", msg.RequestedBytesLimitReached)("scan", ScanId)("packs_to_send", InFlightComputes.GetPacksToSendCount())("from", ev->Sender)(
538-
"shards remain", PendingShards.size())("in flight scans", InFlightShards.GetScansCount())(
539+
"generation", msg.Generation)("exceeded", msg.RequestedBytesLimitReached)("scan", ScanId)(
540+
"packs_to_send", InFlightComputes.GetPacksToSendCount())("from", ev->Sender)("shards remain", PendingShards.size())(
541+
"in flight scans", InFlightShards.GetScansCount())("cursor", state->CursorDebugString())(
539542
"in flight shards", InFlightShards.GetShardsCount())("delayed_for_seconds_by_ratelimiter", latency.SecondsFloat())(
540543
"tablet_id", state->TabletId)("locks", msg.LocksInfo.Locks.size())("broken locks", msg.LocksInfo.BrokenLocks.size());
541544
auto shardScanner = InFlightShards.GetShardScannerVerified(state->TabletId);

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc
7272
void Bootstrap();
7373

7474
STATEFN(StateFunc) {
75+
NActors::TLogContextGuard lGuard =
76+
NActors::TLogContextBuilder::Build()("self_id", SelfId())("scan_id", ScanId)("tx_id", std::get<ui64>(TxId));
7577
try {
7678
switch (ev->GetTypeRewrite()) {
7779
hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute);
@@ -184,6 +186,7 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc
184186

185187
TInFlightShards InFlightShards;
186188
TInFlightComputes InFlightComputes;
189+
const bool IsOlapTable = false;
187190
ui32 TotalRetries = 0;
188191

189192
std::set<ui32> TrackingNodes;

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1894,7 +1894,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
18941894
auto settings = Tests::TServerSettings(mbusport)
18951895
.SetDomainName("Root")
18961896
.SetUseRealThreads(false)
1897-
.SetNodeCount(2);
1897+
.SetNodeCount(2).SetColumnShardReaderClassName("SIMPLE");
18981898

18991899

19001900
Tests::TServer::TPtr server = new Tests::TServer(settings);
@@ -1979,8 +1979,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
19791979
ui16 mbusport = tp.GetPort(2134);
19801980
auto settings = Tests::TServerSettings(mbusport)
19811981
.SetDomainName("Root")
1982-
.SetUseRealThreads(false)
1983-
.SetNodeCount(2);
1982+
.SetUseRealThreads(false).SetNodeCount(2).SetColumnShardReaderClassName("SIMPLE");
19841983

19851984
Tests::TServer::TPtr server = new Tests::TServer(settings);
19861985

@@ -2046,8 +2045,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
20462045
ui16 mbusport = tp.GetPort(2134);
20472046
auto settings = Tests::TServerSettings(mbusport)
20482047
.SetDomainName("Root")
2049-
.SetUseRealThreads(false)
2050-
.SetNodeCount(2);
2048+
.SetUseRealThreads(false).SetNodeCount(2).SetColumnShardReaderClassName(
2049+
"SIMPLE");
20512050

20522051
Tests::TServer::TPtr server = new Tests::TServer(settings);
20532052

@@ -2108,8 +2107,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
21082107
ui16 mbusport = tp.GetPort(2134);
21092108
auto settings = Tests::TServerSettings(mbusport)
21102109
.SetDomainName("Root")
2111-
.SetUseRealThreads(false)
2112-
.SetNodeCount(2);
2110+
.SetUseRealThreads(false).SetNodeCount(2).SetColumnShardReaderClassName(
2111+
"SIMPLE");
21132112

21142113
Tests::TServer::TPtr server = new Tests::TServer(settings);
21152114
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();

ydb/core/testlib/test_client.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,10 @@ namespace Tests {
237237
TServerSettings& SetInitializeFederatedQuerySetupFactory(bool value) { InitializeFederatedQuerySetupFactory = value; return *this; }
238238
TServerSettings& SetVerbose(bool value) { Verbose = value; return *this; }
239239
TServerSettings& SetUseSectorMap(bool value) { UseSectorMap = value; return *this; }
240+
TServerSettings& SetColumnShardReaderClassName(const TString& className) {
241+
AppConfig->MutableColumnShardConfig()->SetReaderClassName(className);
242+
return *this;
243+
}
240244
TServerSettings& SetPersQueueGetReadSessionsInfoWorkerFactory(
241245
std::shared_ptr<NKikimr::NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> factory
242246
) {

ydb/core/tx/columnshard/engines/portions/data_accessor.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,21 +421,24 @@ class TPortionDataAccessor: public TPortionMetaBase {
421421
} else {
422422
auto itSchema = schema.GetColumnIds().begin();
423423
auto itRecords = GetRecordsVerified().begin();
424+
ui32 schemaIdx = 0;
424425
while (itSchema != schema.GetColumnIds().end() && itRecords != GetRecordsVerified().end()) {
425426
if (*itSchema < itRecords->ColumnId) {
426-
NArrow::NSplitter::TColumnSerializationStat stat(*itSchema, schema.GetFieldByColumnIdVerified(*itSchema)->name());
427+
NArrow::NSplitter::TColumnSerializationStat stat(*itSchema, schema.GetFieldByIndexVerified(schemaIdx)->name());
427428
NArrow::NSplitter::TSimpleSerializationStat simpleStat(0, GetPortionInfo().GetRecordsCount(), 0);
428429
stat.Merge(simpleStat);
429430
result.AddStat(stat);
430431
++itSchema;
432+
++schemaIdx;
431433
} else if (itRecords->ColumnId < *itSchema) {
432434
++itRecords;
433435
} else {
434436
while (itRecords != GetRecordsVerified().end() && itRecords->ColumnId == *itSchema) {
435-
result.AddStat(itRecords->GetSerializationStat(schema.GetFieldByColumnIdVerified(*itSchema)->name()));
437+
result.AddStat(itRecords->GetSerializationStat(schema.GetFieldByIndexVerified(schemaIdx)->name()));
436438
++itRecords;
437439
}
438440
++itSchema;
441+
++schemaIdx;
439442
}
440443
}
441444

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,25 @@ class TSortedFullScanCollection: public ISourcesCollection {
4545
TSortedFullScanCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
4646
const std::shared_ptr<IScanCursor>& cursor)
4747
: TBase(context) {
48+
HeapSources = std::move(sources);
49+
std::make_heap(HeapSources.begin(), HeapSources.end());
4850
if (cursor && cursor->IsInitialized()) {
49-
for (auto&& i : sources) {
51+
while (HeapSources.size()) {
5052
bool usage = false;
51-
if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(i, usage)) {
53+
if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(HeapSources.front(), usage)) {
54+
std::pop_heap(HeapSources.begin(), HeapSources.end());
55+
HeapSources.pop_back();
5256
continue;
5357
}
5458
if (usage) {
55-
i.SetIsStartedByCursor();
59+
HeapSources.front().SetIsStartedByCursor();
60+
} else {
61+
std::pop_heap(HeapSources.begin(), HeapSources.end());
62+
HeapSources.pop_back();
5663
}
5764
break;
5865
}
5966
}
60-
HeapSources = std::move(sources);
61-
std::make_heap(HeapSources.begin(), HeapSources.end());
6267
}
6368
};
6469

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ TScanWithLimitCollection::TScanWithLimitCollection(
4242
}
4343
if (usage) {
4444
HeapSources.front().SetIsStartedByCursor();
45+
} else {
46+
std::pop_heap(HeapSources.begin(), HeapSources.end());
47+
HeapSources.pop_back();
4548
}
4649
break;
4750
}

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class TNotSortedCollection: public ISourcesCollection {
6767
}
6868
if (usage) {
6969
sources.front().SetIsStartedByCursor();
70+
} else {
71+
sources.pop_front();
7072
}
7173
break;
7274
}

0 commit comments

Comments
 (0)