Skip to content

Commit 4fddb0c

Browse files
authored
Statistic: Delete analyze after deadline (#8214)
1 parent 43019e6 commit 4fddb0c

File tree

7 files changed

+92
-2
lines changed

7 files changed

+92
-2
lines changed

ydb/core/protos/counters_statistics_aggregator.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ enum ETxTypes {
2323
TXTYPE_ANALYZE_TABLE_REQUEST = 13 [(TxTypeOpts) = {Name: "TxAnalyzeTableRequest"}];
2424
TXTYPE_ANALYZE_TABLE_RESPONSE = 14 [(TxTypeOpts) = {Name: "TxAnalyzeTableResponse"}];
2525
TXTYPE_ANALYZE_TABLE_DELIVERY_PROBLEM = 15 [(TxTypeOpts) = {Name: "TTxAnalyzeTableDeliveryProblem"}];
26+
TXTYPE_ANALYZE_DEADLINE = 16 [(TxTypeOpts) = {Name: "TTxAnalyzeDeadline"}];
2627
}

ydb/core/statistics/aggregator/aggregator_impl.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
4949
struct TTxAnalyzeTableRequest;
5050
struct TTxAnalyzeTableResponse;
5151
struct TTxAnalyzeTableDeliveryProblem;
52+
struct TTxAnalyzeDeadline;
5253
struct TTxNavigate;
5354
struct TTxResolve;
5455
struct TTxDatashardScanResponse;
@@ -70,6 +71,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
7071
EvAckTimeout,
7172
EvSendAnalyze,
7273
EvAnalyzeDeliveryProblem,
74+
EvAnalyzeDeadline,
7375

7476
EvEnd
7577
};
@@ -83,6 +85,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
8385
struct TEvResolve : public TEventLocal<TEvResolve, EvResolve> {};
8486
struct TEvSendAnalyze : public TEventLocal<TEvSendAnalyze, EvSendAnalyze> {};
8587
struct TEvAnalyzeDeliveryProblem : public TEventLocal<TEvAnalyzeDeliveryProblem, EvAnalyzeDeliveryProblem> {};
88+
struct TEvAnalyzeDeadline : public TEventLocal<TEvAnalyzeDeadline, EvAnalyzeDeadline> {};
8689

8790
struct TEvAckTimeout : public TEventLocal<TEvAckTimeout, EvAckTimeout> {
8891
size_t SeqNo = 0;
@@ -146,6 +149,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
146149
void Handle(TEvPrivate::TEvAckTimeout::TPtr& ev);
147150
void Handle(TEvPrivate::TEvSendAnalyze::TPtr& ev);
148151
void Handle(TEvPrivate::TEvAnalyzeDeliveryProblem::TPtr& ev);
152+
void Handle(TEvPrivate::TEvAnalyzeDeadline::TPtr& ev);
149153

150154
void InitializeStatisticsTable();
151155
void Navigate();
@@ -209,6 +213,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
209213
hFunc(TEvPrivate::TEvAckTimeout, Handle);
210214
hFunc(TEvPrivate::TEvSendAnalyze, Handle);
211215
hFunc(TEvPrivate::TEvAnalyzeDeliveryProblem, Handle);
216+
hFunc(TEvPrivate::TEvAnalyzeDeadline, Handle);
212217

213218
default:
214219
if (!HandleDefaultEvents(ev, SelfId())) {
@@ -316,6 +321,8 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
316321
static constexpr size_t SendAnalyzeCount = 100;
317322
static constexpr TDuration SendAnalyzePeriod = TDuration::Seconds(1);
318323
static constexpr TDuration AnalyzeDeliveryProblemPeriod = TDuration::Seconds(1);
324+
static constexpr TDuration AnalyzeDeadline = TDuration::Days(1);
325+
static constexpr TDuration AnalyzeDeadlinePeriod = TDuration::Seconds(1);
319326

320327
enum ENavigateType {
321328
Analyze,
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#include "aggregator_impl.h"
2+
3+
#include <ydb/core/protos/hive.pb.h>
4+
#include <ydb/core/statistics/service/service.h>
5+
6+
#include <util/string/vector.h>
7+
8+
namespace NKikimr::NStat {
9+
10+
struct TStatisticsAggregator::TTxAnalyzeDeadline : public TTxBase {
11+
TString OperationId;
12+
TActorId ReplyToActorId;
13+
14+
TTxAnalyzeDeadline(TSelf* self)
15+
: TTxBase(self)
16+
{}
17+
18+
TTxType GetTxType() const override { return TXTYPE_ANALYZE_DEADLINE; }
19+
20+
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
21+
SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Execute");
22+
23+
NIceDb::TNiceDb db(txc.DB);
24+
auto now = ctx.Now();
25+
26+
for (TForceTraversalOperation& operation : Self->ForceTraversals) {
27+
if (operation.CreatedAt + Self->AnalyzeDeadline < now) {
28+
SA_LOG_E("[" << Self->TabletID() << "] Delete long analyze operation, OperationId=" << operation.OperationId);
29+
30+
OperationId = operation.OperationId;
31+
ReplyToActorId = operation.ReplyToActorId;
32+
Self->DeleteForceTraversalOperation(operation.OperationId, db);
33+
break;
34+
}
35+
}
36+
37+
return true;
38+
}
39+
40+
void Complete(const TActorContext& ctx) override {
41+
SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Complete");
42+
43+
if (OperationId) {
44+
if (ReplyToActorId) {
45+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Complete. " <<
46+
"Send TEvAnalyzeResponse for deleted operation, OperationId=" << OperationId << ", ActorId=" << ReplyToActorId);
47+
auto response = std::make_unique<TEvStatistics::TEvAnalyzeResponse>();
48+
response->Record.SetOperationId(OperationId);
49+
ctx.Send(ReplyToActorId, response.release());
50+
} else {
51+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Complete. No ActorId to send reply. OperationId=" << OperationId);
52+
}
53+
ctx.Send(Self->SelfId(), new TEvPrivate::TEvAnalyzeDeadline());
54+
} else {
55+
ctx.Schedule(AnalyzeDeadlinePeriod, new TEvPrivate::TEvAnalyzeDeadline());
56+
}
57+
}
58+
};
59+
60+
void TStatisticsAggregator::Handle(TEvPrivate::TEvAnalyzeDeadline::TPtr&) {
61+
Execute(new TTxAnalyzeDeadline(this),
62+
TActivationContext::AsActorContext());
63+
}
64+
65+
} // NKikimr::NStat

ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
namespace NKikimr::NStat {
99

1010
struct TStatisticsAggregator::TTxAnalyzeTableDeliveryProblem : public TTxBase {
11-
std::vector<std::unique_ptr<IEventBase>> Events;
12-
1311
TTxAnalyzeTableDeliveryProblem(TSelf* self)
1412
: TTxBase(self)
1513
{}

ydb/core/statistics/aggregator/tx_init.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
277277
Self->Schedule(Self->TraversalPeriod, new TEvPrivate::TEvScheduleTraversal());
278278
Self->Schedule(Self->SendAnalyzePeriod, new TEvPrivate::TEvSendAnalyze());
279279
Self->Schedule(Self->AnalyzeDeliveryProblemPeriod, new TEvPrivate::TEvAnalyzeDeliveryProblem());
280+
Self->Schedule(Self->AnalyzeDeadlinePeriod, new TEvPrivate::TEvAnalyzeDeadline());
280281
} else {
281282
SA_LOG_W("[" << Self->TabletID() << "] TTxInit::Complete. EnableColumnStatistics=false");
282283
}

ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,23 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
258258

259259
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
260260
}
261+
262+
Y_UNIT_TEST(AnalyzeDeadline) {
263+
TTestEnv env(1, 1);
264+
auto& runtime = *env.GetServer().GetRuntime();
265+
auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0];
266+
auto sender = runtime.AllocateEdgeActor();
267+
268+
TBlockEvents<TEvStatistics::TEvAnalyzeTableResponse> block(runtime);
269+
270+
auto analyzeRequest = MakeAnalyzeRequest({tableInfo.PathId});
271+
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest.release());
272+
273+
runtime.WaitFor("TEvAnalyzeTableResponse", [&]{ return block.size(); });
274+
runtime.AdvanceCurrentTime(TDuration::Days(2));
275+
276+
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
277+
}
261278
}
262279

263280
} // NStat

ydb/core/statistics/aggregator/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ SRCS(
1010
tx_ack_timeout.cpp
1111
tx_aggr_stat_response.cpp
1212
tx_analyze.cpp
13+
tx_analyze_deadline.cpp
1314
tx_analyze_table_delivery_problem.cpp
1415
tx_analyze_table_request.cpp
1516
tx_analyze_table_response.cpp

0 commit comments

Comments
 (0)