Skip to content

Commit 78bb468

Browse files
count-min-sketch as index (#7978)
1 parent 48bd4dd commit 78bb468

File tree

25 files changed

+610
-56
lines changed

25 files changed

+610
-56
lines changed

ydb/core/formats/arrow/protos/ssa.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ message TProgram {
4545
repeated uint64 HashValues = 1;
4646
}
4747

48+
message TCountMinSketchChecker {
49+
}
50+
4851
message TOlapIndexChecker {
4952
optional uint32 IndexId = 1;
5053
optional string ClassName = 2;
@@ -56,6 +59,7 @@ message TProgram {
5659
oneof Implementation {
5760
TBloomFilterChecker BloomFilter = 40;
5861
TCompositeChecker Composite = 41;
62+
TCountMinSketchChecker CountMinSketch = 42;
5963
}
6064
}
6165

ydb/core/kqp/ut/olap/indexes_ut.cpp

Lines changed: 133 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
77
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
88

9+
#include <ydb/core/statistics/events.h>
10+
911
#include <library/cpp/testing/unittest/registar.h>
1012

1113
namespace NKikimr::NKqp {
@@ -59,7 +61,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
5961

6062
{
6163
auto alterQuery = TStringBuilder() <<
62-
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
64+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
6365
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`);
6466
)";
6567
auto session = tableClient.CreateSession().GetValueSync().GetSession();
@@ -68,7 +70,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
6870
}
6971
{
7072
auto alterQuery = TStringBuilder() <<
71-
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
73+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
7274
FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05}`);
7375
)";
7476
auto session = tableClient.CreateSession().GetValueSync().GetSession();
@@ -105,6 +107,129 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
105107
}
106108
}
107109

110+
Y_UNIT_TEST(CountMinSketchIndex) {
111+
auto settings = TKikimrSettings()
112+
.SetWithSampleTables(false);
113+
TKikimrRunner kikimr(settings);
114+
115+
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
116+
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
117+
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
118+
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
119+
120+
TLocalHelper(kikimr).CreateTestOlapTable();
121+
auto tableClient = kikimr.GetTableClient();
122+
123+
Tests::NCommon::TLoggerInit(kikimr).SetComponents({NKikimrServices::TX_COLUMNSHARD}, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();
124+
125+
{
126+
auto alterQuery = TStringBuilder() <<
127+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_ts, TYPE=COUNT_MIN_SKETCH,
128+
FEATURES=`{"column_names" : ['timestamp']}`);
129+
)";
130+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
131+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
132+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
133+
}
134+
135+
{
136+
auto alterQuery = TStringBuilder() <<
137+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_res_id, TYPE=COUNT_MIN_SKETCH,
138+
FEATURES=`{"column_names" : ['resource_id']}`);
139+
)";
140+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
141+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
142+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
143+
}
144+
145+
{
146+
auto alterQuery = TStringBuilder() <<
147+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_uid, TYPE=COUNT_MIN_SKETCH,
148+
FEATURES=`{"column_names" : ['uid']}`);
149+
)";
150+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
151+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
152+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
153+
}
154+
155+
{
156+
auto alterQuery = TStringBuilder() <<
157+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_level, TYPE=COUNT_MIN_SKETCH,
158+
FEATURES=`{"column_names" : ['level']}`);
159+
)";
160+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
161+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
162+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
163+
}
164+
165+
{
166+
auto alterQuery = TStringBuilder() <<
167+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_message, TYPE=COUNT_MIN_SKETCH,
168+
FEATURES=`{"column_names" : ['message']}`);
169+
)";
170+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
171+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
172+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
173+
}
174+
175+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
176+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
177+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1200000, 300200000, 10000);
178+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1300000, 300300000, 10000);
179+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000);
180+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000);
181+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000);
182+
183+
csController->WaitActualization(TDuration::Seconds(10));
184+
{
185+
auto runtime = kikimr.GetTestServer().GetRuntime();
186+
auto sender = runtime->AllocateEdgeActor();
187+
188+
TAutoPtr<IEventHandle> handle;
189+
190+
size_t shard = 0;
191+
std::set<ui64> pathids;
192+
for (auto&& i : csController->GetShardActualIds()) {
193+
Cerr << ">>> shard actual id: " << i << Endl;
194+
for (auto&& j : csController->GetPathIds(i)) {
195+
Cerr << ">>> path id: " << j << Endl;
196+
pathids.insert(j);
197+
}
198+
if (++shard == 3)
199+
break;
200+
}
201+
202+
UNIT_ASSERT(pathids.size() == 1);
203+
ui64 pathId = *pathids.begin();
204+
205+
shard = 0;
206+
for (auto&& i : csController->GetShardActualIds()) {
207+
auto request = std::make_unique<NStat::TEvStatistics::TEvStatisticsRequest>();
208+
request->Record.MutableTable()->MutablePathId()->SetLocalId(pathId);
209+
210+
runtime->Send(MakePipePerNodeCacheID(false), sender, new TEvPipeCache::TEvForward(
211+
request.release(), i, false));
212+
if (++shard == 3)
213+
break;
214+
}
215+
216+
auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::Create());
217+
for (size_t shard = 0; shard < 3; ++shard) {
218+
auto event = runtime->GrabEdgeEvent<NStat::TEvStatistics::TEvStatisticsResponse>(handle);
219+
UNIT_ASSERT(event);
220+
221+
auto& response = event->Record;
222+
// Cerr << response << Endl;
223+
UNIT_ASSERT_VALUES_EQUAL(response.GetStatus(), NKikimrStat::TEvStatisticsResponse::STATUS_SUCCESS);
224+
UNIT_ASSERT(response.ColumnsSize() == 5);
225+
TString someData = response.GetColumns(0).GetStatistics(0).GetData();
226+
*sketch += *std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(someData.data(), someData.size()));
227+
Cerr << ">>> sketch.GetElementCount() = " << sketch->GetElementCount() << Endl;
228+
UNIT_ASSERT(sketch->GetElementCount() > 0);
229+
}
230+
}
231+
}
232+
108233
Y_UNIT_TEST(SchemeActualizationOnceOnStart) {
109234
auto settings = TKikimrSettings()
110235
.SetWithSampleTables(false);
@@ -194,7 +319,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
194319

195320
{
196321
auto alterQuery = TStringBuilder() << Sprintf(
197-
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
322+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
198323
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`);
199324
)", StorageId.data());
200325
auto session = tableClient.CreateSession().GetValueSync().GetSession();
@@ -203,7 +328,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
203328
}
204329
{
205330
auto alterQuery = TStringBuilder() << Sprintf(
206-
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
331+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
207332
FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`);
208333
)", StorageId.data()
209334
);
@@ -347,7 +472,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
347472

348473
{
349474
auto alterQuery = TStringBuilder() <<
350-
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
475+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
351476
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`);
352477
)";
353478
auto session = tableClient.CreateSession().GetValueSync().GetSession();
@@ -357,7 +482,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
357482

358483
{
359484
auto alterQuery = TStringBuilder() <<
360-
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
485+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
361486
FEATURES=`{"column_names" : ["uid", "resource_id"], "false_positive_probability" : 0.05}`);
362487
)";
363488
auto session = tableClient.CreateSession().GetValueSync().GetSession();
@@ -367,7 +492,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
367492

368493
{
369494
auto alterQuery = TStringBuilder() <<
370-
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
495+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
371496
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.005}`);
372497
)";
373498
auto session = tableClient.CreateSession().GetValueSync().GetSession();
@@ -377,7 +502,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
377502

378503
{
379504
auto alterQuery = TStringBuilder() <<
380-
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
505+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
381506
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.01}`);
382507
)";
383508
auto session = tableClient.CreateSession().GetValueSync().GetSession();

ydb/core/protos/flat_scheme_op.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,11 @@ message TRequestedMaxIndex {
482482
optional string ColumnName = 1;
483483
}
484484

485+
message TRequestedCountMinSketch {
486+
// sketch built on the combined data from the set of columns
487+
repeated string ColumnNames = 1;
488+
}
489+
485490
message TOlapIndexRequested {
486491
optional string Name = 1;
487492
optional TCompressionOptions Compression = 3;
@@ -491,6 +496,7 @@ message TOlapIndexRequested {
491496
oneof Implementation {
492497
TRequestedBloomFilter BloomFilter = 40;
493498
TRequestedMaxIndex MaxIndex = 41;
499+
TRequestedCountMinSketch CountMinSketch = 42;
494500
}
495501
}
496502

@@ -504,6 +510,10 @@ message TMaxIndex {
504510
optional uint32 ColumnId = 1;
505511
}
506512

513+
message TCountMinSketch {
514+
repeated uint32 ColumnIds = 1;
515+
}
516+
507517
message TOlapIndexDescription {
508518
// This id is auto-generated by schemeshard
509519
optional uint32 Id = 1;
@@ -517,6 +527,7 @@ message TOlapIndexDescription {
517527
oneof Implementation {
518528
TBloomFilter BloomFilter = 41;
519529
TMaxIndex MaxIndex = 42;
530+
TCountMinSketch CountMinSketch = 43;
520531
}
521532
}
522533

ydb/core/protos/out/out.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrDataEvents::TEvWrite::ETxMode, stream, value) {
243243
Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvAnalyzeStatusResponse_EStatus, stream, value) {
244244
stream << NKikimrStat::TEvAnalyzeStatusResponse_EStatus_Name(value);
245245
}
246+
247+
Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvStatisticsResponse::EStatus, stream, value) {
248+
stream << NKikimrStat::TEvStatisticsResponse::EStatus_Name(value);
249+
}

0 commit comments

Comments
 (0)