7
7
#include < ydb/core/statistics/events.h>
8
8
#include < ydb/core/statistics/service/service.h>
9
9
10
- #include < ydb/public/sdk/cpp/client/ydb_result/result.h>
11
- #include < ydb/public/sdk/cpp/client/ydb_table/table.h>
12
- #include < ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
13
-
14
10
#include < thread>
15
11
16
12
namespace NKikimr {
17
13
namespace NStat {
18
14
19
- using namespace NYdb ;
20
- using namespace NYdb ::NTable;
21
- using namespace NYdb ::NScheme;
22
-
23
15
namespace {
24
16
25
- void CreateUniformTable (TTestEnv& env, const TString& databaseName, const TString& tableName) {
26
- TTableClient client (env.GetDriver ());
27
- auto session = client.CreateSession ().GetValueSync ().GetSession ();
28
-
29
- auto result = session.ExecuteSchemeQuery (Sprintf (R"(
30
- CREATE TABLE `Root/%s/%s` (
31
- Key Uint64,
32
- Value Uint64,
33
- PRIMARY KEY (Key)
34
- )
35
- WITH ( UNIFORM_PARTITIONS = 4 );
36
- )" , databaseName.c_str (), tableName.c_str ())).GetValueSync ();
37
- UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
38
-
39
- TStringBuilder replace;
40
- replace << Sprintf (" REPLACE INTO `Root/%s/%s` (Key, Value) VALUES " ,
41
- databaseName.c_str (), tableName.c_str ());
42
- for (ui32 i = 0 ; i < 4 ; ++i) {
43
- if (i > 0 ) {
44
- replace << " , " ;
45
- }
46
- ui64 value = 4000000000000000000ull * (i + 1 );
47
- replace << Sprintf (" (%" PRIu64 " ul, %" PRIu64 " ul)" , value, value);
48
- }
49
- replace << " ;" ;
50
- result = session.ExecuteDataQuery (replace, TTxControl::BeginTx ().CommitTx ()).GetValueSync ();
51
- UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
52
- }
53
-
54
- void CreateColumnStoreTable (TTestEnv& env, const TString& databaseName, const TString& tableName,
55
- int shardCount)
56
- {
57
- TTableClient client (env.GetDriver ());
58
- auto session = client.CreateSession ().GetValueSync ().GetSession ();
59
-
60
- auto fullTableName = Sprintf (" Root/%s/%s" , databaseName.c_str (), tableName.c_str ());
61
- auto result = session.ExecuteSchemeQuery (Sprintf (R"(
62
- CREATE TABLE `%s` (
63
- Key Uint64 NOT NULL,
64
- Value Uint64,
65
- PRIMARY KEY (Key)
66
- )
67
- PARTITION BY HASH(Key)
68
- WITH (
69
- STORE = COLUMN,
70
- AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d
71
- );
72
- )" , fullTableName.c_str (), shardCount)).GetValueSync ();
73
- UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
74
-
75
- NYdb::TValueBuilder rows;
76
- rows.BeginList ();
77
- for (size_t i = 0 ; i < 100 ; ++i) {
78
- auto key = TValueBuilder ().Uint64 (i).Build ();
79
- auto value = TValueBuilder ().OptionalUint64 (i).Build ();
80
- rows.AddListItem ();
81
- rows.BeginStruct ();
82
- rows.AddMember (" Key" , key);
83
- rows.AddMember (" Value" , value);
84
- rows.EndStruct ();
85
- }
86
- rows.EndList ();
87
-
88
- result = client.BulkUpsert (fullTableName, rows.Build ()).GetValueSync ();
89
- UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
90
- }
91
-
92
- void DropTable (TTestEnv& env, const TString& databaseName, const TString& tableName) {
93
- TTableClient client (env.GetDriver ());
94
- auto session = client.CreateSession ().GetValueSync ().GetSession ();
95
-
96
- auto result = session.ExecuteSchemeQuery (Sprintf (R"(
97
- DROP TABLE `Root/%s/%s`;
98
- )" , databaseName.c_str (), tableName.c_str ())).GetValueSync ();
99
- UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
100
- }
101
-
102
- std::shared_ptr<TCountMinSketch> ExtractCountMin (TTestActorRuntime& runtime, TPathId pathId) {
103
- auto statServiceId = NStat::MakeStatServiceID (runtime.GetNodeId (1 ));
104
-
105
- NStat::TRequest req;
106
- req.PathId = pathId;
107
- req.ColumnTag = 1 ;
108
-
109
- auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
110
- evGet->StatType = NStat::EStatType::COUNT_MIN_SKETCH;
111
- evGet->StatRequests .push_back (req);
112
-
113
- auto sender = runtime.AllocateEdgeActor (1 );
114
- runtime.Send (statServiceId, sender, evGet.release (), 1 , true );
115
- auto evResult = runtime.GrabEdgeEventRethrow <TEvStatistics::TEvGetStatisticsResult>(sender);
116
-
117
- UNIT_ASSERT (evResult);
118
- UNIT_ASSERT (evResult->Get ());
119
- UNIT_ASSERT (evResult->Get ()->StatResponses .size () == 1 );
120
-
121
- auto rsp = evResult->Get ()->StatResponses [0 ];
122
- auto stat = rsp.CountMinSketch ;
123
- UNIT_ASSERT (rsp.Success );
124
- UNIT_ASSERT (stat.CountMin );
125
-
126
- return stat.CountMin ;
127
- }
128
-
129
- void ValidateCountMin (TTestActorRuntime& runtime, TPathId pathId) {
130
- auto countMin = ExtractCountMin (runtime, pathId);
131
-
132
- for (ui32 i = 0 ; i < 4 ; ++i) {
133
- ui64 value = 4000000000000000000ull * (i + 1 );
134
- auto probe = countMin->Probe ((const char *)&value, sizeof (ui64));
135
- UNIT_ASSERT_VALUES_EQUAL (probe, 1 );
136
- }
137
- }
138
-
139
- void ValidateCountMinAbsense (TTestActorRuntime& runtime, TPathId pathId) {
140
- auto statServiceId = NStat::MakeStatServiceID (runtime.GetNodeId (1 ));
141
-
142
- NStat::TRequest req;
143
- req.PathId = pathId;
144
- req.ColumnTag = 1 ;
145
-
146
- auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
147
- evGet->StatType = NStat::EStatType::COUNT_MIN_SKETCH;
148
- evGet->StatRequests .push_back (req);
149
-
150
- auto sender = runtime.AllocateEdgeActor (1 );
151
- runtime.Send (statServiceId, sender, evGet.release (), 1 , true );
152
- auto evResult = runtime.GrabEdgeEventRethrow <TEvStatistics::TEvGetStatisticsResult>(sender);
153
-
154
- UNIT_ASSERT (evResult);
155
- UNIT_ASSERT (evResult->Get ());
156
- UNIT_ASSERT (evResult->Get ()->StatResponses .size () == 1 );
157
-
158
- auto rsp = evResult->Get ()->StatResponses [0 ];
159
- UNIT_ASSERT (!rsp.Success );
160
- }
161
17
162
18
} // namespace
163
19
@@ -175,8 +31,8 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
175
31
runtime.SimulateSleep (TDuration::Seconds (5 ));
176
32
initThread.join ();
177
33
178
- ui64 tabletId ;
179
- auto pathId = ResolvePathId (runtime, " /Root/Database/Table" , nullptr , &tabletId );
34
+ ui64 saTabletId ;
35
+ auto pathId = ResolvePathId (runtime, " /Root/Database/Table" , nullptr , &saTabletId );
180
36
181
37
runtime.SimulateSleep (TDuration::Seconds (30 ));
182
38
@@ -185,7 +41,7 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
185
41
PathIdFromPathId (pathId, record.AddTables ()->MutablePathId ());
186
42
187
43
auto sender = runtime.AllocateEdgeActor ();
188
- runtime.SendToPipe (tabletId , sender, ev.release ());
44
+ runtime.SendToPipe (saTabletId , sender, ev.release ());
189
45
runtime.GrabEdgeEventRethrow <TEvStatistics::TEvAnalyzeResponse>(sender);
190
46
191
47
ValidateCountMin (runtime, pathId);
@@ -208,8 +64,8 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
208
64
// TODO remove sleep
209
65
runtime.SimulateSleep (TDuration::Seconds (30 ));
210
66
211
- ui64 tabletId1 ;
212
- auto pathId1 = ResolvePathId (runtime, " /Root/Database/Table1" , nullptr , &tabletId1 );
67
+ ui64 saTabletId1 ;
68
+ auto pathId1 = ResolvePathId (runtime, " /Root/Database/Table1" , nullptr , &saTabletId1 );
213
69
auto pathId2 = ResolvePathId (runtime, " /Root/Database/Table2" );
214
70
215
71
auto ev = std::make_unique<TEvStatistics::TEvAnalyze>();
@@ -218,7 +74,7 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
218
74
PathIdFromPathId (pathId2, record.AddTables ()->MutablePathId ());
219
75
220
76
auto sender = runtime.AllocateEdgeActor ();
221
- runtime.SendToPipe (tabletId1 , sender, ev.release ());
77
+ runtime.SendToPipe (saTabletId1 , sender, ev.release ());
222
78
runtime.GrabEdgeEventRethrow <TEvStatistics::TEvAnalyzeResponse>(sender);
223
79
runtime.GrabEdgeEventRethrow <TEvStatistics::TEvAnalyzeResponse>(sender);
224
80
@@ -335,8 +191,8 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
335
191
runtime.SimulateSleep (TDuration::Seconds (5 ));
336
192
initThread.join ();
337
193
338
- ui64 tabletId = 0 ;
339
- auto pathId = ResolvePathId (runtime, " /Root/Database/Table" , nullptr , &tabletId );
194
+ ui64 saTabletId = 0 ;
195
+ auto pathId = ResolvePathId (runtime, " /Root/Database/Table" , nullptr , &saTabletId );
340
196
341
197
auto init2 = [&] () {
342
198
DropTable (env, " Database" , " Table" );
@@ -351,7 +207,7 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
351
207
PathIdFromPathId (pathId, record.AddTables ()->MutablePathId ());
352
208
353
209
auto sender = runtime.AllocateEdgeActor ();
354
- runtime.SendToPipe (tabletId , sender, ev.release ());
210
+ runtime.SendToPipe (saTabletId , sender, ev.release ());
355
211
356
212
runtime.SimulateSleep (TDuration::Seconds (60 ));
357
213
@@ -370,9 +226,7 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
370
226
runtime.SimulateSleep (TDuration::Seconds (30 ));
371
227
initThread.join ();
372
228
373
- ui64 tabletId = 0 ;
374
- auto pathId = ResolvePathId (runtime, " /Root/Database/Table" , nullptr , &tabletId);
375
- Y_UNUSED (pathId);
229
+ auto pathId = ResolvePathId (runtime, " /Root/Database/Table" );
376
230
377
231
runtime.SimulateSleep (TDuration::Seconds (30 ));
378
232
0 commit comments