Skip to content

Commit c11bbbe

Browse files
authored
add columnshard traversal in statistics aggregator (#7004)
1 parent af4bb4b commit c11bbbe

18 files changed

+559
-36
lines changed

ydb/core/protos/counters_statistics_aggregator.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,7 @@ enum ETxTypes {
1818
TXTYPE_SAVE_QUERY_RESPONSE = 8 [(TxTypeOpts) = {Name: "TxSaveQueryResponse"}];
1919
TXTYPE_SCHEDULE_SCAN = 9 [(TxTypeOpts) = {Name: "TxScheduleScan"}];
2020
TXTYPE_DELETE_QUERY_RESPONSE = 10 [(TxTypeOpts) = {Name: "TxDeleteQueryResponse"}];
21+
TXTYPE_AGGR_STAT_RESPONSE = 11 [(TxTypeOpts) = {Name: "TxAggregateStatisticsResponse"}];
22+
TXTYPE_RESPONSE_TABLET_DISTRIBUTION = 12 [(TxTypeOpts) = {Name: "TxResponseTabletDistribution"}];
23+
TXTYPE_ACK_TIMEOUT = 13 [(TxTypeOpts) = {Name: "TxAckTimeout"}];
2124
}

ydb/core/protos/statistics.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ message TPathEntry {
1313
optional NKikimrProto.TPathID PathId = 1;
1414
optional uint64 RowCount = 2;
1515
optional uint64 BytesSize = 3;
16+
optional bool IsColumnTable = 4;
1617
}
1718

1819
message TSchemeShardStats {

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -396,15 +396,26 @@ size_t TStatisticsAggregator::PropagatePart(const std::vector<TNodeId>& nodeIds,
396396
}
397397

398398
void TStatisticsAggregator::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
399-
auto tabletId = ev->Get()->TabletId;
400-
if (ShardRanges.empty()) {
399+
if (!ScanTableId.PathId) {
401400
return;
402401
}
403-
auto& range = ShardRanges.front();
404-
if (tabletId != range.DataShardId) {
405-
return;
402+
auto tabletId = ev->Get()->TabletId;
403+
if (IsColumnTable) {
404+
if (tabletId == HiveId) {
405+
Schedule(HiveRetryInterval, new TEvPrivate::TEvRequestDistribution);
406+
} else {
407+
SA_LOG_CRIT("[" << TabletID() << "] TEvDeliveryProblem with unexpected tablet " << tabletId);
408+
}
409+
} else {
410+
if (ShardRanges.empty()) {
411+
return;
412+
}
413+
auto& range = ShardRanges.front();
414+
if (tabletId != range.DataShardId) {
415+
return;
416+
}
417+
Resolve();
406418
}
407-
Resolve();
408419
}
409420

410421
void TStatisticsAggregator::Handle(TEvStatistics::TEvStatTableCreationResponse::TPtr&) {
@@ -439,6 +450,30 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvGetScanStatus::TPtr& ev) {
439450
Send(ev->Sender, response.release(), 0, ev->Cookie);
440451
}
441452

453+
void TStatisticsAggregator::Handle(TEvPrivate::TEvResolve::TPtr&) {
454+
Resolve();
455+
}
456+
457+
void TStatisticsAggregator::Handle(TEvPrivate::TEvRequestDistribution::TPtr&) {
458+
++HiveRequestRound;
459+
460+
auto reqDistribution = std::make_unique<TEvHive::TEvRequestTabletDistribution>();
461+
reqDistribution->Record.MutableTabletIds()->Reserve(TabletsForReqDistribution.size());
462+
for (auto& tablet : TabletsForReqDistribution) {
463+
reqDistribution->Record.AddTabletIds(tablet);
464+
}
465+
466+
Send(MakePipePerNodeCacheID(false),
467+
new TEvPipeCache::TEvForward(reqDistribution.release(), HiveId, true));
468+
}
469+
470+
void TStatisticsAggregator::Handle(TEvStatistics::TEvAggregateKeepAlive::TPtr& ev) {
471+
auto ack = std::make_unique<TEvStatistics::TEvAggregateKeepAliveAck>();
472+
ack->Record.SetRound(ev->Get()->Record.GetRound());
473+
Send(ev->Sender, ack.release());
474+
Schedule(KeepAliveTimeout, new TEvPrivate::TEvAckTimeout(++KeepAliveSeqNo));
475+
}
476+
442477
void TStatisticsAggregator::InitializeStatisticsTable() {
443478
if (!EnableColumnStatistics) {
444479
return;
@@ -460,6 +495,8 @@ void TStatisticsAggregator::Navigate() {
460495
}
461496

462497
void TStatisticsAggregator::Resolve() {
498+
++ResolveRound;
499+
463500
TVector<TCell> plusInf;
464501
TTableRange range(StartKey.GetCells(), true, plusInf, true, false);
465502
auto keyDesc = MakeHolder<TKeyDesc>(
@@ -500,6 +537,9 @@ void TStatisticsAggregator::SaveStatisticsToTable() {
500537
std::vector<ui32> columnTags;
501538
std::vector<TString> data;
502539
auto count = CountMinSketches.size();
540+
if (count == 0) {
541+
return;
542+
}
503543
columnTags.reserve(count);
504544
data.reserve(count);
505545

@@ -533,28 +573,44 @@ void TStatisticsAggregator::ScheduleNextScan(NIceDb::TNiceDb& db) {
533573
auto* operation = ScanOperations.Front();
534574
ReplyToActorIds.swap(operation->ReplyToActorIds);
535575

536-
StartScan(db, operation->PathId);
537-
576+
bool doStartScan = true;
577+
bool isColumnTable = false;
578+
auto pathId = operation->PathId;
579+
auto itPath = ScanTables.find(pathId);
580+
if (itPath != ScanTables.end()) {
581+
isColumnTable = itPath->second.IsColumnTable;
582+
} else {
583+
doStartScan = false;
584+
}
585+
if (doStartScan) {
586+
StartScan(db, pathId, isColumnTable);
587+
}
538588
db.Table<Schema::ScanOperations>().Key(operation->OperationId).Delete();
539589
ScanOperations.PopFront();
540-
ScanOperationsByPathId.erase(operation->PathId);
590+
ScanOperationsByPathId.erase(pathId);
541591
return;
542592
}
543593
if (ScanTablesByTime.Empty()) {
544594
return;
545595
}
546596
auto* topTable = ScanTablesByTime.Top();
547-
auto now = TInstant::Now();
548-
auto updateTime = topTable->LastUpdateTime;
549-
if (now - updateTime < ScanIntervalTime) {
597+
if (TInstant::Now() < topTable->LastUpdateTime + ScanIntervalTime) {
598+
return;
599+
}
600+
bool isColumnTable = false;
601+
auto itPath = ScanTables.find(topTable->PathId);
602+
if (itPath != ScanTables.end()) {
603+
isColumnTable = itPath->second.IsColumnTable;
604+
} else {
550605
return;
551606
}
552-
StartScan(db, topTable->PathId);
607+
StartScan(db, topTable->PathId, isColumnTable);
553608
}
554609

555-
void TStatisticsAggregator::StartScan(NIceDb::TNiceDb& db, TPathId pathId) {
610+
void TStatisticsAggregator::StartScan(NIceDb::TNiceDb& db, TPathId pathId, bool isColumnTable) {
556611
ScanTableId.PathId = pathId;
557612
ScanStartTime = TInstant::Now();
613+
IsColumnTable = isColumnTable;
558614
PersistCurrentScan(db);
559615

560616
StartKey = TSerializedCellVec();
@@ -590,6 +646,7 @@ void TStatisticsAggregator::PersistCurrentScan(NIceDb::TNiceDb& db) {
590646
PersistSysParam(db, Schema::SysParam_ScanTableOwnerId, ToString(ScanTableId.PathId.OwnerId));
591647
PersistSysParam(db, Schema::SysParam_ScanTableLocalPathId, ToString(ScanTableId.PathId.LocalPathId));
592648
PersistSysParam(db, Schema::SysParam_ScanStartTime, ToString(ScanStartTime.MicroSeconds()));
649+
PersistSysParam(db, Schema::SysParam_IsColumnTable, ToString(IsColumnTable));
593650
}
594651

595652
void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) {
@@ -600,6 +657,10 @@ void TStatisticsAggregator::PersistLastScanOperationId(NIceDb::TNiceDb& db) {
600657
PersistSysParam(db, Schema::SysParam_LastScanOperationId, ToString(LastScanOperationId));
601658
}
602659

660+
void TStatisticsAggregator::PersistGlobalTraversalRound(NIceDb::TNiceDb& db) {
661+
PersistSysParam(db, Schema::SysParam_GlobalTraversalRound, ToString(GlobalTraversalRound));
662+
}
663+
603664
void TStatisticsAggregator::ResetScanState(NIceDb::TNiceDb& db) {
604665
ScanTableId.PathId = TPathId();
605666
ScanStartTime = TInstant::MicroSeconds(0);
@@ -620,6 +681,12 @@ void TStatisticsAggregator::ResetScanState(NIceDb::TNiceDb& db) {
620681
KeyColumnTypes.clear();
621682
Columns.clear();
622683
ColumnNames.clear();
684+
685+
TabletsForReqDistribution.clear();
686+
687+
ResolveRound = 0;
688+
HiveRequestRound = 0;
689+
TraversalRound = 0;
623690
}
624691

625692
template <typename T, typename S>

ydb/core/statistics/aggregator/aggregator_impl.h

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/core/protos/statistics.pb.h>
66
#include <ydb/core/protos/counters_statistics_aggregator.pb.h>
77

8+
#include <ydb/core/base/hive.h>
89
#include <ydb/core/base/tablet_pipe.h>
910
#include <ydb/core/base/tablet_pipecache.h>
1011
#include <ydb/core/statistics/common.h>
@@ -51,6 +52,9 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
5152
struct TTxSaveQueryResponse;
5253
struct TTxScheduleScan;
5354
struct TTxDeleteQueryResponse;
55+
struct TTxAggregateStatisticsResponse;
56+
struct TTxResponseTabletDistribution;
57+
struct TTxAckTimeout;
5458

5559
struct TEvPrivate {
5660
enum EEv {
@@ -59,6 +63,9 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
5963
EvProcessUrgent,
6064
EvPropagateTimeout,
6165
EvScheduleScan,
66+
EvRequestDistribution,
67+
EvResolve,
68+
EvAckTimeout,
6269

6370
EvEnd
6471
};
@@ -68,6 +75,15 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
6875
struct TEvProcessUrgent : public TEventLocal<TEvProcessUrgent, EvProcessUrgent> {};
6976
struct TEvPropagateTimeout : public TEventLocal<TEvPropagateTimeout, EvPropagateTimeout> {};
7077
struct TEvScheduleScan : public TEventLocal<TEvScheduleScan, EvScheduleScan> {};
78+
struct TEvRequestDistribution : public TEventLocal<TEvRequestDistribution, EvRequestDistribution> {};
79+
struct TEvResolve : public TEventLocal<TEvResolve, EvResolve> {};
80+
81+
struct TEvAckTimeout : public TEventLocal<TEvAckTimeout, EvAckTimeout> {
82+
size_t SeqNo = 0;
83+
explicit TEvAckTimeout(size_t seqNo) {
84+
SeqNo = seqNo;
85+
}
86+
};
7187
};
7288

7389
private:
@@ -114,6 +130,12 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
114130
void Handle(TEvStatistics::TEvDeleteStatisticsQueryResponse::TPtr& ev);
115131
void Handle(TEvPrivate::TEvScheduleScan::TPtr& ev);
116132
void Handle(TEvStatistics::TEvGetScanStatus::TPtr& ev);
133+
void Handle(TEvHive::TEvResponseTabletDistribution::TPtr& ev);
134+
void Handle(TEvStatistics::TEvAggregateStatisticsResponse::TPtr& ev);
135+
void Handle(TEvPrivate::TEvResolve::TPtr& ev);
136+
void Handle(TEvPrivate::TEvRequestDistribution::TPtr& ev);
137+
void Handle(TEvStatistics::TEvAggregateKeepAlive::TPtr& ev);
138+
void Handle(TEvPrivate::TEvAckTimeout::TPtr& ev);
117139

118140
void InitializeStatisticsTable();
119141
void Navigate();
@@ -126,13 +148,13 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
126148
void PersistCurrentScan(NIceDb::TNiceDb& db);
127149
void PersistStartKey(NIceDb::TNiceDb& db);
128150
void PersistLastScanOperationId(NIceDb::TNiceDb& db);
151+
void PersistGlobalTraversalRound(NIceDb::TNiceDb& db);
129152

130153
void ResetScanState(NIceDb::TNiceDb& db);
131154
void ScheduleNextScan(NIceDb::TNiceDb& db);
132-
void StartScan(NIceDb::TNiceDb& db, TPathId pathId);
155+
void StartScan(NIceDb::TNiceDb& db, TPathId pathId, bool isColumnTable);
133156
void FinishScan(NIceDb::TNiceDb& db);
134157

135-
136158
STFUNC(StateInit) {
137159
StateInitImpl(ev, SelfId());
138160
}
@@ -164,6 +186,12 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
164186
hFunc(TEvStatistics::TEvDeleteStatisticsQueryResponse, Handle);
165187
hFunc(TEvPrivate::TEvScheduleScan, Handle);
166188
hFunc(TEvStatistics::TEvGetScanStatus, Handle);
189+
hFunc(TEvHive::TEvResponseTabletDistribution, Handle);
190+
hFunc(TEvStatistics::TEvAggregateStatisticsResponse, Handle);
191+
hFunc(TEvPrivate::TEvResolve, Handle);
192+
hFunc(TEvPrivate::TEvRequestDistribution, Handle);
193+
hFunc(TEvStatistics::TEvAggregateKeepAlive, Handle);
194+
hFunc(TEvPrivate::TEvAckTimeout, Handle);
167195

168196
default:
169197
if (!HandleDefaultEvents(ev, SelfId())) {
@@ -214,6 +242,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
214242
//
215243

216244
TTableId ScanTableId; // stored in local db
245+
bool IsColumnTable = false; // stored in local db
217246
std::unordered_set<TActorId> ReplyToActorIds;
218247

219248
bool IsStatisticsTableCreated = false;
@@ -241,6 +270,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
241270
TPathId PathId;
242271
ui64 SchemeShardId = 0;
243272
TInstant LastUpdateTime;
273+
bool IsColumnTable = false;
244274

245275
size_t HeapIndexByTime = -1;
246276

@@ -275,6 +305,24 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
275305
ui64 LastScanOperationId = 0; // stored in local db
276306

277307
TInstant ScanStartTime;
308+
309+
size_t ResolveRound = 0;
310+
static constexpr size_t MaxResolveRoundCount = 5;
311+
static constexpr TDuration ResolveRetryInterval = TDuration::Seconds(1);
312+
313+
ui64 HiveId = 0;
314+
std::unordered_set<ui64> TabletsForReqDistribution;
315+
316+
size_t HiveRequestRound = 0;
317+
static constexpr size_t MaxHiveRequestRoundCount = 5;
318+
static constexpr TDuration HiveRetryInterval = TDuration::Seconds(1);
319+
320+
size_t TraversalRound = 0;
321+
static constexpr size_t MaxTraversalRoundCount = 5;
322+
size_t GlobalTraversalRound = 1; // stored in local db
323+
324+
size_t KeepAliveSeqNo = 0;
325+
static constexpr TDuration KeepAliveTimeout = TDuration::Seconds(3);
278326
};
279327

280328
} // NKikimr::NStat

ydb/core/statistics/aggregator/schema.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ struct TAggregatorSchema : NIceDb::Schema {
3434
struct LocalPathId : Column<2, NScheme::NTypeIds::Uint64> {};
3535
struct LastUpdateTime : Column<3, NScheme::NTypeIds::Timestamp> {};
3636
struct SchemeShardId : Column<4, NScheme::NTypeIds::Uint64> {};
37+
struct IsColumnTable : Column<5, NScheme::NTypeIds::Bool> {};
3738

3839
using TKey = TableKey<OwnerId, LocalPathId>;
3940
using TColumns = TableColumns<
4041
OwnerId,
4142
LocalPathId,
4243
LastUpdateTime,
43-
SchemeShardId
44+
SchemeShardId,
45+
IsColumnTable
4446
>;
4547
};
4648

@@ -76,6 +78,8 @@ struct TAggregatorSchema : NIceDb::Schema {
7678
static constexpr ui64 SysParam_ScanTableLocalPathId = 4;
7779
static constexpr ui64 SysParam_ScanStartTime = 5;
7880
static constexpr ui64 SysParam_LastScanOperationId = 6;
81+
static constexpr ui64 SysParam_IsColumnTable = 7;
82+
static constexpr ui64 SysParam_GlobalTraversalRound = 8;
7983
};
8084

8185
} // NKikimr::NStat
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#include "aggregator_impl.h"
2+
3+
namespace NKikimr::NStat {
4+
5+
struct TStatisticsAggregator::TTxAckTimeout : public TTxBase {
6+
explicit TTxAckTimeout(TSelf* self)
7+
: TTxBase(self)
8+
{}
9+
10+
TTxType GetTxType() const override { return TXTYPE_ACK_TIMEOUT; }
11+
12+
bool Execute(TTransactionContext& txc, const TActorContext&) override {
13+
SA_LOG_D("[" << Self->TabletID() << "] TTxAckTimeout::Execute");
14+
return true;
15+
}
16+
17+
void Complete(const TActorContext& ctx) override {
18+
SA_LOG_D("[" << Self->TabletID() << "] TTxAckTimeout::Complete");
19+
20+
ctx.Send(Self->SelfId(), new TEvPrivate::TEvRequestDistribution);
21+
}
22+
};
23+
24+
void TStatisticsAggregator::Handle(TEvPrivate::TEvAckTimeout::TPtr& ev) {
25+
if (ev->Get()->SeqNo < KeepAliveSeqNo) {
26+
return;
27+
}
28+
// timeout
29+
Execute(new TTxAckTimeout(this), TActivationContext::AsActorContext());
30+
}
31+
32+
} // NKikimr::NStat

0 commit comments

Comments
 (0)