Skip to content

Commit 1869eed

Browse files
authored
Statistics: retry AnalyzeTable to ColumnShard (#8190)
1 parent 3ca30a5 commit 1869eed

File tree

7 files changed

+89
-1
lines changed

7 files changed

+89
-1
lines changed

ydb/core/protos/counters_statistics_aggregator.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ enum ETxTypes {
2222
TXTYPE_ACK_TIMEOUT = 12 [(TxTypeOpts) = {Name: "TxAckTimeout"}];
2323
TXTYPE_ANALYZE_TABLE_REQUEST = 13 [(TxTypeOpts) = {Name: "TxAnalyzeTableRequest"}];
2424
TXTYPE_ANALYZE_TABLE_RESPONSE = 14 [(TxTypeOpts) = {Name: "TxAnalyzeTableResponse"}];
25+
TXTYPE_ANALYZE_TABLE_DELIVERY_PROBLEM = 15 [(TxTypeOpts) = {Name: "TTxAnalyzeTableDeliveryProblem"}];
2526
}

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,11 +404,24 @@ void TStatisticsAggregator::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
404404
auto tabletId = ev->Get()->TabletId;
405405
if (TraversalIsColumnTable) {
406406
if (tabletId == HiveId) {
407+
SA_LOG_E("[" << TabletID() << "] TEvDeliveryProblem with HiveId=" << tabletId);
407408
Schedule(HiveRetryInterval, new TEvPrivate::TEvRequestDistribution);
408409
} else {
410+
for (TForceTraversalOperation& operation : ForceTraversals) {
411+
for (TForceTraversalTable& operationTable : operation.Tables) {
412+
for (TAnalyzedShard& shard : operationTable.AnalyzedShards) {
413+
if (shard.ShardTabletId == tabletId) {
414+
SA_LOG_E("[" << TabletID() << "] TEvDeliveryProblem with ColumnShard=" << tabletId);
415+
shard.Status = TAnalyzedShard::EStatus::DeliveryProblem;
416+
return;
417+
}
418+
}
419+
}
420+
}
409421
SA_LOG_CRIT("[" << TabletID() << "] TEvDeliveryProblem with unexpected tablet " << tabletId);
410422
}
411423
} else {
424+
SA_LOG_E("[" << TabletID() << "] TEvDeliveryProblem with DataShard=" << tabletId);
412425
if (DatashardRanges.empty()) {
413426
return;
414427
}

ydb/core/statistics/aggregator/aggregator_impl.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
4848
struct TTxAnalyze;
4949
struct TTxAnalyzeTableRequest;
5050
struct TTxAnalyzeTableResponse;
51+
struct TTxAnalyzeTableDeliveryProblem;
5152
struct TTxNavigate;
5253
struct TTxResolve;
5354
struct TTxDatashardScanResponse;
@@ -68,6 +69,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
6869
EvResolve,
6970
EvAckTimeout,
7071
EvSendAnalyze,
72+
EvAnalyzeDeliveryProblem,
7173

7274
EvEnd
7375
};
@@ -80,6 +82,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
8082
struct TEvRequestDistribution : public TEventLocal<TEvRequestDistribution, EvRequestDistribution> {};
8183
struct TEvResolve : public TEventLocal<TEvResolve, EvResolve> {};
8284
struct TEvSendAnalyze : public TEventLocal<TEvSendAnalyze, EvSendAnalyze> {};
85+
struct TEvAnalyzeDeliveryProblem : public TEventLocal<TEvAnalyzeDeliveryProblem, EvAnalyzeDeliveryProblem> {};
8386

8487
struct TEvAckTimeout : public TEventLocal<TEvAckTimeout, EvAckTimeout> {
8588
size_t SeqNo = 0;
@@ -142,6 +145,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
142145
void Handle(TEvStatistics::TEvAggregateKeepAlive::TPtr& ev);
143146
void Handle(TEvPrivate::TEvAckTimeout::TPtr& ev);
144147
void Handle(TEvPrivate::TEvSendAnalyze::TPtr& ev);
148+
void Handle(TEvPrivate::TEvAnalyzeDeliveryProblem::TPtr& ev);
145149

146150
void InitializeStatisticsTable();
147151
void Navigate();
@@ -204,6 +208,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
204208
hFunc(TEvStatistics::TEvAggregateKeepAlive, Handle);
205209
hFunc(TEvPrivate::TEvAckTimeout, Handle);
206210
hFunc(TEvPrivate::TEvSendAnalyze, Handle);
211+
hFunc(TEvPrivate::TEvAnalyzeDeliveryProblem, Handle);
207212

208213
default:
209214
if (!HandleDefaultEvents(ev, SelfId())) {
@@ -311,6 +316,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
311316

312317
static constexpr size_t SendAnalyzeCount = 100;
313318
static constexpr TDuration SendAnalyzePeriod = TDuration::Seconds(1);
319+
static constexpr TDuration AnalyzeDeliveryProblemPeriod = TDuration::Seconds(1);
314320

315321
enum ENavigateType {
316322
Analyze,
@@ -348,6 +354,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
348354

349355
enum class EStatus : ui8 {
350356
None,
357+
DeliveryProblem,
351358
AnalyzeStarted,
352359
AnalyzeFinished,
353360
};
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#include "aggregator_impl.h"
2+
3+
#include <ydb/core/protos/hive.pb.h>
4+
#include <ydb/core/statistics/service/service.h>
5+
6+
#include <util/string/vector.h>
7+
8+
namespace NKikimr::NStat {
9+
10+
struct TStatisticsAggregator::TTxAnalyzeTableDeliveryProblem : public TTxBase {
11+
std::vector<std::unique_ptr<IEventBase>> Events;
12+
13+
TTxAnalyzeTableDeliveryProblem(TSelf* self)
14+
: TTxBase(self)
15+
{}
16+
17+
TTxType GetTxType() const override { return TXTYPE_ANALYZE_TABLE_DELIVERY_PROBLEM; }
18+
19+
bool Execute(TTransactionContext&, const TActorContext&) override {
20+
SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeTableDeliveryProblem::Execute");
21+
22+
for (TForceTraversalOperation& operation : Self->ForceTraversals) {
23+
for (TForceTraversalTable& operationTable : operation.Tables) {
24+
for(TAnalyzedShard& analyzedShard : operationTable.AnalyzedShards) {
25+
if (analyzedShard.Status == TAnalyzedShard::EStatus::DeliveryProblem) {
26+
SA_LOG_D("[" << Self->TabletID() << "] Reset DeliveryProblem to ColumnShard=" << analyzedShard.ShardTabletId);
27+
analyzedShard.Status = TAnalyzedShard::EStatus::None;
28+
}
29+
}
30+
}
31+
}
32+
33+
return true;
34+
}
35+
36+
void Complete(const TActorContext& ctx) override {
37+
SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeTableDeliveryProblem::Complete");
38+
39+
ctx.Schedule(AnalyzeDeliveryProblemPeriod, new TEvPrivate::TEvAnalyzeDeliveryProblem());
40+
}
41+
};
42+
43+
void TStatisticsAggregator::Handle(TEvPrivate::TEvAnalyzeDeliveryProblem::TPtr&) {
44+
Execute(new TTxAnalyzeTableDeliveryProblem(this),
45+
TActivationContext::AsActorContext());
46+
}
47+
48+
} // NKikimr::NStat

ydb/core/statistics/aggregator/tx_init.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
274274
if (Self->EnableColumnStatistics) {
275275
Self->Schedule(Self->TraversalPeriod, new TEvPrivate::TEvScheduleTraversal());
276276
Self->Schedule(Self->SendAnalyzePeriod, new TEvPrivate::TEvSendAnalyze());
277+
Self->Schedule(Self->AnalyzeDeliveryProblemPeriod, new TEvPrivate::TEvAnalyzeDeliveryProblem());
277278
} else {
278279
SA_LOG_W("[" << Self->TabletID() << "] TTxInit::Complete. EnableColumnStatistics=false");
279280
}

ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
66

7-
#include <thread>
7+
#include <ydb/core/testlib/actors/block_events.h>
88

99
namespace NKikimr {
1010
namespace NStat {
@@ -226,6 +226,23 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
226226
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
227227
}
228228

229+
Y_UNIT_TEST(AnalyzeRebootColumnShard) {
230+
TTestEnv env(1, 1);
231+
auto& runtime = *env.GetServer().GetRuntime();
232+
auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0];
233+
auto sender = runtime.AllocateEdgeActor();
234+
235+
TBlockEvents<TEvStatistics::TEvAnalyzeTableResponse> block(runtime);
236+
237+
auto analyzeRequest = MakeAnalyzeRequest({tableInfo.PathId});
238+
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest.release());
239+
240+
runtime.WaitFor("TEvAnalyzeTableResponse", [&]{ return block.size(); });
241+
block.Stop();
242+
RebootTablet(runtime, tableInfo.ShardIds[0], sender);
243+
244+
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
245+
}
229246
}
230247

231248
} // NStat

ydb/core/statistics/aggregator/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ SRCS(
1010
tx_ack_timeout.cpp
1111
tx_aggr_stat_response.cpp
1212
tx_analyze.cpp
13+
tx_analyze_table_delivery_problem.cpp
1314
tx_analyze_table_request.cpp
1415
tx_analyze_table_response.cpp
1516
tx_configure.cpp

0 commit comments

Comments
 (0)