Skip to content

Commit cee8afe

Browse files
authored
Statistics: reset events in test observers (#8118)
1 parent d5924e3 commit cee8afe

File tree

5 files changed

+65
-26
lines changed

5 files changed

+65
-26
lines changed

ydb/core/statistics/aggregator/tx_init.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
284284
Self->InitializeStatisticsTable();
285285

286286
if (Self->TraversalPathId && Self->TraversalStartKey) {
287+
SA_LOG_D("[" << Self->TabletID() << "] TTxInit::Complete. Start navigate. PathId " << Self->TraversalPathId);
287288
Self->NavigateType = ENavigateType::Traversal;
288289
Self->NavigatePathId = Self->TraversalPathId;
289290
Self->Navigate();

ydb/core/statistics/aggregator/tx_schedule_traversal.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ struct TStatisticsAggregator::TTxScheduleTrasersal : public TTxBase {
1212
TTxType GetTxType() const override { return TXTYPE_SCHEDULE_TRAVERSAL; }
1313

1414
bool Execute(TTransactionContext& txc, const TActorContext&) override {
15-
SA_LOG_T("[" << Self->TabletID() << "] TTxScheduleTrasersal::Execute");
1615

1716
if (!Self->EnableColumnStatistics) {
1817
return true;
@@ -28,6 +27,8 @@ struct TStatisticsAggregator::TTxScheduleTrasersal : public TTxBase {
2827
return true;
2928
}
3029

30+
SA_LOG_T("[" << Self->TabletID() << "] TTxScheduleTrasersal::Execute");
31+
3132
NIceDb::TNiceDb db(txc.DB);
3233

3334
switch (Self->NavigateType) {

ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,16 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
9494
auto sender = runtime.AllocateEdgeActor();
9595

9696
bool eventSeen = false;
97-
auto observer = runtime.AddObserver<TEvStatistics::TEvAnalyzeTableResponse>([&](auto&) {
97+
auto observer = runtime.AddObserver<TEvStatistics::TEvAnalyzeTableResponse>([&](auto& ev) {
9898
eventSeen = true;
99+
ev.Reset();
99100
});
100101

101102
auto analyzeRequest1 = MakeAnalyzeRequest({tableInfo.PathId});
102103
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest1.release());
103104

104105
runtime.WaitFor("TEvAnalyzeTableResponse", [&]{ return eventSeen; });
106+
observer.Remove();
105107
RebootTablet(runtime, tableInfo.SaTabletId, sender);
106108

107109
auto analyzeRequest2 = MakeAnalyzeRequest({tableInfo.PathId});
@@ -116,16 +118,21 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
116118
auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0];
117119
auto sender = runtime.AllocateEdgeActor();
118120

119-
int observerCount = 0;
120-
auto observer = runtime.AddObserver<TEvTxProxySchemeCache::TEvResolveKeySetResult>([&](auto&){
121-
observerCount++;
122-
});
121+
TBlockEvents<TEvTxProxySchemeCache::TEvResolveKeySetResult> block(runtime);
123122

124123
auto analyzeRequest1 = MakeAnalyzeRequest({tableInfo.PathId});
125124
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest1.release());
126-
127-
runtime.WaitFor("TEvResolveKeySetResult", [&]{ return observerCount == 3; });
125+
126+
runtime.WaitFor("1st TEvResolveKeySetResult", [&]{ return block.size() >= 1; });
127+
block.Unblock(1);
128+
runtime.WaitFor("2nd TEvResolveKeySetResult", [&]{ return block.size() >= 1; });
129+
block.Unblock(1);
130+
runtime.WaitFor("3rd TEvResolveKeySetResult", [&]{ return block.size() >= 1; });
131+
128132
RebootTablet(runtime, tableInfo.SaTabletId, sender);
133+
134+
block.Unblock();
135+
block.Stop();
129136

130137
auto analyzeRequest2 = MakeAnalyzeRequest({tableInfo.PathId});
131138
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest2.release());
@@ -140,14 +147,16 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
140147
auto sender = runtime.AllocateEdgeActor();
141148

142149
bool eventSeen = false;
143-
auto observer = runtime.AddObserver<TEvHive::TEvRequestTabletDistribution>([&](auto&) {
150+
auto observer = runtime.AddObserver<TEvHive::TEvRequestTabletDistribution>([&](auto& ev) {
144151
eventSeen = true;
152+
ev.Reset();
145153
});
146154

147155
auto analyzeRequest1 = MakeAnalyzeRequest({tableInfo.PathId});
148156
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest1.release());
149157

150158
runtime.WaitFor("TEvRequestTabletDistribution", [&]{ return eventSeen; });
159+
observer.Remove();
151160
RebootTablet(runtime, tableInfo.SaTabletId, sender);
152161

153162
auto analyzeRequest2 = MakeAnalyzeRequest({tableInfo.PathId});
@@ -163,14 +172,16 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
163172
auto sender = runtime.AllocateEdgeActor();
164173

165174
bool eventSeen = false;
166-
auto observer = runtime.AddObserver<TEvStatistics::TEvAggregateStatistics>([&](auto&){
175+
auto observer = runtime.AddObserver<TEvStatistics::TEvAggregateStatistics>([&](auto& ev){
167176
eventSeen = true;
177+
ev.Reset();
168178
});
169179

170180
auto analyzeRequest1 = MakeAnalyzeRequest({tableInfo.PathId});
171181
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest1.release());
172182

173183
runtime.WaitFor("TEvAggregateStatistics", [&]{ return eventSeen; });
184+
observer.Remove();
174185
RebootTablet(runtime, tableInfo.SaTabletId, sender);
175186

176187
auto analyzeRequest2 = MakeAnalyzeRequest({tableInfo.PathId});
@@ -186,14 +197,16 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
186197
auto sender = runtime.AllocateEdgeActor();
187198

188199
bool eventSeen = false;
189-
auto observer = runtime.AddObserver<TEvStatistics::TEvAggregateStatisticsResponse>([&](auto&){
200+
auto observer = runtime.AddObserver<TEvStatistics::TEvAggregateStatisticsResponse>([&](auto& ev){
190201
eventSeen = true;
202+
ev.Reset();
191203
});
192204

193205
auto analyzeRequest1 = MakeAnalyzeRequest({tableInfo.PathId});
194206
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest1.release());
195207

196208
runtime.WaitFor("TEvAggregateStatisticsResponse", [&]{ return eventSeen; });
209+
observer.Remove();
197210
RebootTablet(runtime, tableInfo.SaTabletId, sender);
198211

199212
auto analyzeRequest2 = MakeAnalyzeRequest({tableInfo.PathId});
@@ -202,22 +215,24 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
202215
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
203216
}
204217

205-
//
206218
Y_UNIT_TEST(AnalyzeRebootSaInAggregate) {
207219
TTestEnv env(1, 1);
208220
auto& runtime = *env.GetServer().GetRuntime();
209221
auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0];
210222
auto sender = runtime.AllocateEdgeActor();
211223

212224
int observerCount = 0;
213-
auto observer = runtime.AddObserver<TEvStatistics::TEvStatisticsRequest>([&](auto&) {
214-
observerCount++;
225+
auto observer = runtime.AddObserver<TEvStatistics::TEvStatisticsRequest>([&](auto& ev) {
226+
if (++observerCount >= 5) {
227+
ev.Reset();
228+
}
215229
});
216230

217231
auto analyzeRequest1 = MakeAnalyzeRequest({tableInfo.PathId});
218232
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest1.release());
219233

220-
runtime.WaitFor("5th TEvStatisticsRequest", [&]{ return observerCount == 5; });
234+
runtime.WaitFor("5th TEvStatisticsRequest", [&]{ return observerCount >= 5; });
235+
observer.Remove();
221236
RebootTablet(runtime, tableInfo.SaTabletId, sender);
222237

223238
auto analyzeRequest2 = MakeAnalyzeRequest({tableInfo.PathId});

ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
77
#include <ydb/core/statistics/events.h>
88
#include <ydb/core/statistics/service/service.h>
9+
#include <ydb/core/testlib/actors/block_events.h>
910

1011
namespace NKikimr {
1112
namespace NStat {
@@ -28,13 +29,20 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) {
2829
auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0];
2930
auto sender = runtime.AllocateEdgeActor();
3031

31-
int eventCount = 0;
32-
auto observer = runtime.AddObserver<TEvTxProxySchemeCache::TEvResolveKeySetResult>([&](auto&) {
33-
eventCount++;
34-
});
32+
TBlockEvents<TEvTxProxySchemeCache::TEvResolveKeySetResult> block(runtime);
3533

36-
runtime.WaitFor("TEvResolveKeySetResult", [&]{ return eventCount == 3; });
34+
runtime.WaitFor("1st TEvResolveKeySetResult", [&]{ return block.size() >= 1; });
35+
block.Unblock(1);
36+
runtime.WaitFor("2nd TEvResolveKeySetResult", [&]{ return block.size() >= 1; });
37+
block.Unblock(1);
38+
runtime.WaitFor("3rd TEvResolveKeySetResult", [&]{ return block.size() >= 1; });
39+
3740
RebootTablet(runtime, tableInfo.SaTabletId, sender);
41+
42+
block.Unblock();
43+
block.Stop();
44+
45+
runtime.SimulateSleep(TDuration::Seconds(10));
3846

3947
ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10);
4048
}
@@ -46,11 +54,13 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) {
4654
auto sender = runtime.AllocateEdgeActor();
4755

4856
bool eventSeen = false;
49-
auto observer = runtime.AddObserver<TEvHive::TEvRequestTabletDistribution>([&](auto&){
57+
auto observer = runtime.AddObserver<TEvHive::TEvRequestTabletDistribution>([&](auto& ev){
5058
eventSeen = true;
59+
ev.Reset();
5160
});
5261

5362
runtime.WaitFor("TEvRequestTabletDistribution", [&]{ return eventSeen; });
63+
observer.Remove();
5464
RebootTablet(runtime, tableInfo.SaTabletId, sender);
5565

5666
ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10);
@@ -63,11 +73,13 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) {
6373
auto sender = runtime.AllocateEdgeActor();
6474

6575
bool eventSeen = false;
66-
auto observer = runtime.AddObserver<TEvStatistics::TEvAggregateStatistics>([&](auto&){
76+
auto observer = runtime.AddObserver<TEvStatistics::TEvAggregateStatistics>([&](auto& ev){
6777
eventSeen = true;
78+
ev.Reset();
6879
});
6980

7081
runtime.WaitFor("TEvAggregateStatistics", [&]{ return eventSeen; });
82+
observer.Remove();
7183
RebootTablet(runtime, tableInfo.SaTabletId, sender);
7284

7385
ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10);
@@ -80,11 +92,13 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) {
8092
auto sender = runtime.AllocateEdgeActor();
8193

8294
bool eventSeen = false;
83-
auto observer = runtime.AddObserver<TEvStatistics::TEvAggregateStatisticsResponse>([&](auto&){
95+
auto observer = runtime.AddObserver<TEvStatistics::TEvAggregateStatisticsResponse>([&](auto& ev){
8496
eventSeen = true;
97+
ev.Reset();
8598
});
8699

87100
runtime.WaitFor("TEvAggregateStatisticsResponse", [&]{ return eventSeen; });
101+
observer.Remove();
88102
RebootTablet(runtime, tableInfo.SaTabletId, sender);
89103

90104
ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10);
@@ -97,11 +111,14 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) {
97111
auto sender = runtime.AllocateEdgeActor();
98112

99113
int observerCount = 0;
100-
auto observer = runtime.AddObserver<TEvStatistics::TEvStatisticsRequest>([&](auto&){
101-
observerCount++;
114+
auto observer = runtime.AddObserver<TEvStatistics::TEvStatisticsRequest>([&](auto& ev){
115+
if (++observerCount >= 5) {
116+
ev.Reset();
117+
}
102118
});
103119

104-
runtime.WaitFor("5th TEvStatisticsRequest", [&]{ return observerCount == 5; });
120+
runtime.WaitFor("5th TEvStatisticsRequest", [&]{ return observerCount >= 5; });
121+
observer.Remove();
105122
RebootTablet(runtime, tableInfo.SaTabletId, sender);
106123

107124
ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10);

ydb/core/statistics/service/service_impl.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,11 @@ class TStatService : public TActorBootstrapped<TStatService> {
909909
}
910910

911911
NTabletPipe::SendData(SelfId(), clientId, request.release(), AggregationStatistics.Round);
912+
913+
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
914+
"TEvStatisticsRequest send"
915+
<< ", client id = " << clientId
916+
<< ", path = " << *path);
912917
}
913918

914919
void OnTabletError(ui64 tabletId) {

0 commit comments

Comments
 (0)