Skip to content

Commit 0a33939

Browse files
authored
Statistics: Send TEvAnalyzeTable to shards (#7880)
1 parent 37e61ab commit 0a33939

15 files changed

+449
-102
lines changed

ydb/core/protos/counters_statistics_aggregator.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,6 @@ enum ETxTypes {
2020
TXTYPE_AGGR_STAT_RESPONSE = 10 [(TxTypeOpts) = {Name: "TxAggregateStatisticsResponse"}];
2121
TXTYPE_RESPONSE_TABLET_DISTRIBUTION = 11 [(TxTypeOpts) = {Name: "TxResponseTabletDistribution"}];
2222
TXTYPE_ACK_TIMEOUT = 12 [(TxTypeOpts) = {Name: "TxAckTimeout"}];
23+
TXTYPE_ANALYZE_TABLE_REQUEST = 13 [(TxTypeOpts) = {Name: "TxAnalyzeTableRequest"}];
24+
TXTYPE_ANALYZE_TABLE_RESPONSE = 14 [(TxTypeOpts) = {Name: "TxAnalyzeTableResponse"}];
2325
}

ydb/core/protos/statistics.proto

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,16 @@ message TEvAnalyzeStatusResponse {
111111

112112
// SA -> Shard
113113
message TEvAnalyzeTable {
114-
optional TTable Table = 1; // analyzed table
115-
repeated EColumnStatisticType Types = 2; // list of statistics types requested. Empty means asking for all available.
114+
optional bytes OperationId = 1; // unique identifier to match response item
115+
optional TTable Table = 2; // analyzed table
116+
repeated EColumnStatisticType Types = 3; // list of statistics types requested. Empty means asking for all available.
116117
}
117118

118119
// Shard -> SA
119120
message TEvAnalyzeTableResponse {
120-
optional NKikimrProto.TPathID PathId = 1;
121+
optional bytes OperationId = 1;
122+
optional NKikimrProto.TPathID PathId = 2;
123+
optional fixed64 ShardTabletId = 3;
121124
}
122125

123126

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 100 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ size_t TStatisticsAggregator::PropagatePart(const std::vector<TNodeId>& nodeIds,
398398
}
399399

400400
void TStatisticsAggregator::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
401-
if (!TraversalTableId.PathId) {
401+
if (!TraversalPathId) {
402402
return;
403403
}
404404
auto tabletId = ev->Get()->TabletId;
@@ -461,11 +461,7 @@ void TStatisticsAggregator::Handle(TEvPrivate::TEvRequestDistribution::TPtr&) {
461461
++HiveRequestRound;
462462

463463
auto reqDistribution = std::make_unique<TEvHive::TEvRequestTabletDistribution>();
464-
reqDistribution->Record.MutableTabletIds()->Reserve(TabletsForReqDistribution.size());
465-
for (auto& tablet : TabletsForReqDistribution) {
466-
reqDistribution->Record.AddTabletIds(tablet);
467-
}
468-
464+
reqDistribution->Record.MutableTabletIds()->Add(TabletsForReqDistribution.begin(), TabletsForReqDistribution.end());
469465
Send(MakePipePerNodeCacheID(false),
470466
new TEvPipeCache::TEvForward(reqDistribution.release(), HiveId, true));
471467
}
@@ -485,9 +481,13 @@ void TStatisticsAggregator::InitializeStatisticsTable() {
485481
}
486482

487483
void TStatisticsAggregator::Navigate() {
484+
Y_ABORT_UNLESS(NavigateType == ENavigateType::Traversal && !NavigateAnalyzeOperationId
485+
|| NavigateType == ENavigateType::Analyze && NavigateAnalyzeOperationId);
486+
Y_ABORT_UNLESS(NavigatePathId);
487+
488488
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
489489
TNavigate::TEntry entry;
490-
entry.TableId = TraversalTableId;
490+
entry.TableId = NavigatePathId;
491491
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
492492
entry.Operation = TNavigate::OpTable;
493493

@@ -498,12 +498,16 @@ void TStatisticsAggregator::Navigate() {
498498
}
499499

500500
void TStatisticsAggregator::Resolve() {
501+
Y_ABORT_UNLESS(NavigateType == ENavigateType::Traversal && !NavigateAnalyzeOperationId
502+
|| NavigateType == ENavigateType::Analyze && NavigateAnalyzeOperationId);
503+
Y_ABORT_UNLESS(NavigatePathId);
504+
501505
++ResolveRound;
502506

503507
TVector<TCell> plusInf;
504508
TTableRange range(TraversalStartKey.GetCells(), true, plusInf, true, false);
505509
auto keyDesc = MakeHolder<TKeyDesc>(
506-
TraversalTableId, range, TKeyDesc::ERowOperation::Read, KeyColumnTypes, Columns);
510+
NavigatePathId, range, TKeyDesc::ERowOperation::Read, KeyColumnTypes, Columns);
507511

508512
auto request = std::make_unique<NSchemeCache::TSchemeCacheRequest>();
509513
request->ResultSet.emplace_back(std::move(keyDesc));
@@ -521,8 +525,8 @@ void TStatisticsAggregator::ScanNextDatashardRange() {
521525
auto request = std::make_unique<NStat::TEvStatistics::TEvStatisticsRequest>();
522526
auto& record = request->Record;
523527
auto* path = record.MutableTable()->MutablePathId();
524-
path->SetOwnerId(TraversalTableId.PathId.OwnerId);
525-
path->SetLocalId(TraversalTableId.PathId.LocalPathId);
528+
path->SetOwnerId(TraversalPathId.OwnerId);
529+
path->SetLocalId(TraversalPathId.LocalPathId);
526530
record.SetStartKey(TraversalStartKey.GetBuffer());
527531

528532
Send(MakePipePerNodeCacheID(false),
@@ -557,7 +561,7 @@ void TStatisticsAggregator::SaveStatisticsToTable() {
557561
data.push_back(strSketch);
558562
}
559563

560-
Register(CreateSaveStatisticsQuery(TraversalTableId.PathId, EStatType::COUNT_MIN_SKETCH,
564+
Register(CreateSaveStatisticsQuery(TraversalPathId, EStatType::COUNT_MIN_SKETCH,
561565
std::move(columnTags), std::move(data)));
562566
}
563567

@@ -569,42 +573,75 @@ void TStatisticsAggregator::DeleteStatisticsFromTable() {
569573

570574
PendingDeleteStatistics = false;
571575

572-
Register(CreateDeleteStatisticsQuery(TraversalTableId.PathId));
576+
Register(CreateDeleteStatisticsQuery(TraversalPathId));
573577
}
574578

575-
void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
576-
if (!IsSchemeshardSeen) {
577-
SA_LOG_T("[" << TabletID() << "] No info from schemeshard");
579+
void TStatisticsAggregator::ScheduleNextAnalyze(NIceDb::TNiceDb& db) {
580+
Y_UNUSED(db);
581+
if (ForceTraversals.empty()) {
582+
SA_LOG_T("[" << TabletID() << "] ScheduleNextAnalyze. Empty ForceTraversals");
578583
return;
584+
}
585+
SA_LOG_D("[" << TabletID() << "] ScheduleNextAnalyze");
586+
587+
for (TForceTraversalOperation& operation : ForceTraversals) {
588+
for (TForceTraversalTable& operationTable : operation.Tables) {
589+
if (operationTable.Status == TForceTraversalTable::EStatus::None) {
590+
std::optional<bool> isColumnTable = IsColumnTable(operationTable.PathId);
591+
if (!isColumnTable) {
592+
ForceTraversalOperationId = operation.OperationId;
593+
TraversalPathId = operationTable.PathId;
594+
DeleteStatisticsFromTable();
595+
return;
596+
}
597+
598+
if (*isColumnTable) {
599+
NavigateAnalyzeOperationId = operation.OperationId;
600+
NavigatePathId = operationTable.PathId;
601+
Navigate();
602+
return;
603+
} else {
604+
SA_LOG_D("[" << TabletID() << "] ScheduleNextAnalyze. Skip analyze for datashard table " << operationTable.PathId);
605+
UpdateForceTraversalTableStatus(TForceTraversalTable::EStatus::AnalyzeFinished, operation.OperationId, operationTable, db);
606+
return;
607+
}
608+
}
609+
}
610+
611+
SA_LOG_D("[" << TabletID() << "] ScheduleNextAnalyze. All the force traversal tables sent the requests. OperationId=" << operation.OperationId);
612+
continue;
579613
}
580614

615+
SA_LOG_D("[" << TabletID() << "] ScheduleNextAnalyze. All the force traversal operations sent the requests.");
616+
}
617+
618+
void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
619+
SA_LOG_D("[" << TabletID() << "] ScheduleNextTraversal");
620+
581621
TPathId pathId;
582622

583623
if (!LastTraversalWasForce) {
584624
LastTraversalWasForce = true;
585625

586626
for (TForceTraversalOperation& operation : ForceTraversals) {
587627
for (TForceTraversalTable& operationTable : operation.Tables) {
588-
if (operationTable.Status == TForceTraversalTable::EStatus::None) {
589-
operationTable.Status = TForceTraversalTable::EStatus::RequestSent;
590-
db.Table<Schema::ForceTraversalTables>().Key(operation.OperationId, operationTable.PathId.OwnerId, operationTable.PathId.LocalPathId)
591-
.Update(NIceDb::TUpdate<Schema::ForceTraversalTables::Status>((ui64)operationTable.Status));
592-
628+
if (operationTable.Status == TForceTraversalTable::EStatus::AnalyzeFinished) {
629+
UpdateForceTraversalTableStatus(TForceTraversalTable::EStatus::TraversalStarted, operation.OperationId, operationTable, db);
593630
pathId = operationTable.PathId;
594631
break;
595632
}
596633
}
597634

598635
if (!pathId) {
599-
SA_LOG_D("[" << TabletID() << "] All the force traversal tables sent the requests. OperationId=" << operation.OperationId);
636+
SA_LOG_D("[" << TabletID() << "] ScheduleNextTraversal. All the force traversal tables sent the requests. OperationId=" << operation.OperationId);
600637
continue;
601638
}
602639

603640
ForceTraversalOperationId = operation.OperationId;
604641
}
605642

606643
if (!pathId) {
607-
SA_LOG_D("[" << TabletID() << "] All the force traversal operations sent the requests.");
644+
SA_LOG_D("[" << TabletID() << "] ScheduleNextTraversal. All the force traversal operations sent the requests.");
608645
}
609646
}
610647

@@ -626,19 +663,19 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
626663
return;
627664
}
628665

629-
auto itPath = ScheduleTraversals.find(pathId);
630-
if (itPath != ScheduleTraversals.end()) {
631-
TraversalIsColumnTable = itPath->second.IsColumnTable;
632-
} else {
633-
SA_LOG_E("[" << TabletID() << "] traversal path " << pathId << " is not known to schemeshard");
666+
TraversalPathId = pathId;
667+
668+
std::optional<bool> isColumnTable = IsColumnTable(pathId);
669+
if (!isColumnTable){
670+
DeleteStatisticsFromTable();
634671
return;
635672
}
636673

637-
TraversalTableId.PathId = pathId;
674+
TraversalIsColumnTable = *isColumnTable;
638675

639676
SA_LOG_D("[" << TabletID() << "] Start "
640677
<< LastTraversalWasForceString()
641-
<< " traversal for path " << pathId);
678+
<< " traversal navigate for path " << pathId);
642679

643680
StartTraversal(db);
644681
}
@@ -650,11 +687,12 @@ void TStatisticsAggregator::StartTraversal(NIceDb::TNiceDb& db) {
650687
TraversalStartKey = TSerializedCellVec();
651688
PersistStartKey(db);
652689

690+
NavigatePathId = TraversalPathId;
653691
Navigate();
654692
}
655693

656694
void TStatisticsAggregator::FinishTraversal(NIceDb::TNiceDb& db) {
657-
auto pathId = TraversalTableId.PathId;
695+
auto pathId = TraversalPathId;
658696

659697
auto pathIt = ScheduleTraversals.find(pathId);
660698
if (pathIt != ScheduleTraversals.end()) {
@@ -670,8 +708,12 @@ void TStatisticsAggregator::FinishTraversal(NIceDb::TNiceDb& db) {
670708

671709
auto forceTraversalOperation = CurrentForceTraversalOperation();
672710
if (forceTraversalOperation) {
711+
auto operationTable = CurrentForceTraversalTable();
712+
713+
UpdateForceTraversalTableStatus(TForceTraversalTable::EStatus::TraversalFinished, forceTraversalOperation->OperationId, *operationTable, db);
714+
673715
bool tablesRemained = std::any_of(forceTraversalOperation->Tables.begin(), forceTraversalOperation->Tables.end(),
674-
[](const TForceTraversalTable& elem) { return elem.Status == TForceTraversalTable::EStatus::None;});
716+
[](const TForceTraversalTable& elem) { return elem.Status != TForceTraversalTable::EStatus::TraversalFinished;});
675717
if (!tablesRemained) {
676718
DeleteForceTraversalOperation(ForceTraversalOperationId, db);
677719
}
@@ -699,6 +741,21 @@ TStatisticsAggregator::TForceTraversalOperation* TStatisticsAggregator::ForceTra
699741
}
700742
}
701743

744+
std::optional<bool> TStatisticsAggregator::IsColumnTable(const TPathId& pathId) const {
745+
Y_ABORT_UNLESS(IsSchemeshardSeen);
746+
747+
auto itPath = ScheduleTraversals.find(pathId);
748+
if (itPath != ScheduleTraversals.end()) {
749+
bool ret = itPath->second.IsColumnTable;
750+
SA_LOG_D("[" << TabletID() << "] IsColumnTable. Path " << pathId << " is "
751+
<< (ret ? "column" : "data") << " table.");
752+
return ret;
753+
} else {
754+
SA_LOG_E("[" << TabletID() << "] IsColumnTable. traversal path " << pathId << " is not known to schemeshard");
755+
return {};
756+
}
757+
}
758+
702759
void TStatisticsAggregator::DeleteForceTraversalOperation(const TString& operationId, NIceDb::TNiceDb& db) {
703760
db.Table<Schema::ForceTraversalOperations>().Key(ForceTraversalOperationId).Delete();
704761

@@ -725,17 +782,24 @@ TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::ForceTravers
725782
}
726783

727784
TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::CurrentForceTraversalTable() {
728-
return ForceTraversalTable(ForceTraversalOperationId, TraversalTableId.PathId);
785+
return ForceTraversalTable(ForceTraversalOperationId, TraversalPathId);
786+
}
787+
788+
void TStatisticsAggregator::UpdateForceTraversalTableStatus(const TForceTraversalTable::EStatus status, const TString& operationId, TStatisticsAggregator::TForceTraversalTable& table, NIceDb::TNiceDb& db) {
789+
table.Status = status;
790+
db.Table<Schema::ForceTraversalTables>().Key(operationId, table.PathId.OwnerId, table.PathId.LocalPathId)
791+
.Update(NIceDb::TUpdate<Schema::ForceTraversalTables::Status>((ui64)status));
729792
}
730793

794+
731795
void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) {
732796
db.Table<Schema::SysParams>().Key(id).Update(
733797
NIceDb::TUpdate<Schema::SysParams::Value>(value));
734798
}
735799

736800
void TStatisticsAggregator::PersistTraversal(NIceDb::TNiceDb& db) {
737-
PersistSysParam(db, Schema::SysParam_TraversalTableOwnerId, ToString(TraversalTableId.PathId.OwnerId));
738-
PersistSysParam(db, Schema::SysParam_TraversalTableLocalPathId, ToString(TraversalTableId.PathId.LocalPathId));
801+
PersistSysParam(db, Schema::SysParam_TraversalTableOwnerId, ToString(TraversalPathId.OwnerId));
802+
PersistSysParam(db, Schema::SysParam_TraversalTableLocalPathId, ToString(TraversalPathId.LocalPathId));
739803
PersistSysParam(db, Schema::SysParam_TraversalStartTime, ToString(TraversalStartTime.MicroSeconds()));
740804
PersistSysParam(db, Schema::SysParam_TraversalIsColumnTable, ToString(TraversalIsColumnTable));
741805
}
@@ -750,7 +814,7 @@ void TStatisticsAggregator::PersistGlobalTraversalRound(NIceDb::TNiceDb& db) {
750814

751815
void TStatisticsAggregator::ResetTraversalState(NIceDb::TNiceDb& db) {
752816
ForceTraversalOperationId.clear();
753-
TraversalTableId.PathId = TPathId();
817+
TraversalPathId = {};
754818
TraversalStartTime = TInstant::MicroSeconds(0);
755819
PersistTraversal(db);
756820

@@ -856,7 +920,7 @@ bool TStatisticsAggregator::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev
856920
str << "PendingRequests: " << PendingRequests.size() << Endl;
857921
str << "ProcessUrgentInFlight: " << ProcessUrgentInFlight << Endl << Endl;
858922

859-
str << "TraversalTableId: " << TraversalTableId << Endl;
923+
str << "TraversalPathId: " << TraversalPathId << Endl;
860924
str << "Columns: " << Columns.size() << Endl;
861925
str << "DatashardRanges: " << DatashardRanges.size() << Endl;
862926
str << "CountMinSketches: " << CountMinSketches.size() << Endl << Endl;

0 commit comments

Comments
 (0)