Skip to content

Commit b698447

Browse files
vitalifkunga
authored andcommitted
Fix a bug where KMeans clusters could change on schemeshard restart (#18979) (#19004)
1 parent 3bb5eef commit b698447

File tree

5 files changed

+44
-39
lines changed

5 files changed

+44
-39
lines changed

ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,7 +1021,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
10211021
return false;
10221022
}
10231023

1024-
if (!buildInfo.Sample.Rows.empty()) {
1024+
if (buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::Sample &&
1025+
!buildInfo.Sample.Rows.empty()) {
10251026
if (buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Collect) {
10261027
LOG_D("FillVectorIndex SendUploadSampleKRequest " << buildInfo.DebugString());
10271028
SendUploadSampleKRequest(buildInfo);
@@ -1033,7 +1034,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
10331034
ClearDoneShards(txc, buildInfo);
10341035

10351036
if (!buildInfo.Sample.Rows.empty()) {
1036-
if (buildInfo.KMeans.NextState()) {
1037+
if (buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::Sample) {
1038+
buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Reshuffle;
10371039
LOG_D("FillVectorIndex NextState " << buildInfo.DebugString());
10381040
PersistKMeansState(txc, buildInfo);
10391041
Progress(BuildId);
@@ -1603,29 +1605,6 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex
16031605
}
16041606

16051607
NIceDb::TNiceDb db(txc.DB);
1606-
if (record.ProbabilitiesSize()) {
1607-
Y_ENSURE(record.RowsSize());
1608-
auto& probabilities = record.GetProbabilities();
1609-
auto& rows = *record.MutableRows();
1610-
Y_ENSURE(probabilities.size() == rows.size());
1611-
auto& sample = buildInfo.Sample.Rows;
1612-
auto from = sample.size();
1613-
for (int i = 0; i != probabilities.size(); ++i) {
1614-
if (probabilities[i] >= buildInfo.Sample.MaxProbability) {
1615-
break;
1616-
}
1617-
sample.emplace_back(probabilities[i], std::move(rows[i]));
1618-
}
1619-
if (buildInfo.Sample.MakeWeakTop(buildInfo.KMeans.K)) {
1620-
from = 0;
1621-
}
1622-
for (; from < sample.size(); ++from) {
1623-
db.Table<Schema::KMeansTreeSample>().Key(buildInfo.Id, from).Update(
1624-
NIceDb::TUpdate<Schema::KMeansTreeSample::Probability>(sample[from].P),
1625-
NIceDb::TUpdate<Schema::KMeansTreeSample::Data>(sample[from].Row)
1626-
);
1627-
}
1628-
}
16291608

16301609
TBillingStats stats{0, 0, record.GetReadRows(), record.GetReadBytes()};
16311610
shardStatus.Processed += stats;
@@ -1639,6 +1618,32 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex
16391618
switch (shardStatus.Status) {
16401619
case NKikimrIndexBuilder::EBuildStatus::DONE:
16411620
if (buildInfo.InProgressShards.erase(shardIdx)) {
1621+
if (record.ProbabilitiesSize()) {
1622+
Y_ENSURE(record.RowsSize());
1623+
auto& probabilities = record.GetProbabilities();
1624+
auto& rows = *record.MutableRows();
1625+
Y_ENSURE(probabilities.size() == rows.size());
1626+
auto& sample = buildInfo.Sample.Rows;
1627+
auto from = sample.size();
1628+
for (int i = 0; i != probabilities.size(); ++i) {
1629+
if (probabilities[i] >= buildInfo.Sample.MaxProbability) {
1630+
break;
1631+
}
1632+
sample.emplace_back(probabilities[i], std::move(rows[i]));
1633+
}
1634+
if (buildInfo.Sample.MakeWeakTop(buildInfo.KMeans.K)) {
1635+
from = 0;
1636+
}
1637+
for (; from < sample.size(); ++from) {
1638+
db.Table<Schema::KMeansTreeSample>().Key(buildInfo.Id, from).Update(
1639+
NIceDb::TUpdate<Schema::KMeansTreeSample::Probability>(sample[from].P),
1640+
NIceDb::TUpdate<Schema::KMeansTreeSample::Data>(sample[from].Row)
1641+
);
1642+
}
1643+
for (; from < 2*buildInfo.KMeans.K; ++from) {
1644+
db.Table<Schema::KMeansTreeSample>().Key(buildInfo.Id, from).Delete();
1645+
}
1646+
}
16421647
buildInfo.DoneShards.emplace_back(shardIdx);
16431648
}
16441649
break;

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3136,17 +3136,6 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
31363136
bool NeedsAnotherParent() const noexcept {
31373137
return Parent < ParentEnd();
31383138
}
3139-
bool NeedsAnotherState() const noexcept {
3140-
return State == Sample /*|| State == Recompute*/;
3141-
}
3142-
3143-
bool NextState() noexcept {
3144-
if (!NeedsAnotherState()) {
3145-
return false;
3146-
}
3147-
State = static_cast<EState>(static_cast<ui32>(State) + 1);
3148-
return true;
3149-
}
31503139

31513140
bool NextParent() noexcept {
31523141
if (!NeedsAnotherParent()) {

ydb/core/tx/schemeshard/ut_index/ut_vector_index.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
77
#include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h>
88
#include <ydb/core/testlib/tablet_helpers.h>
9-
#include <ydb/public/lib/deprecated/kicli/kicli.h>
109

1110

1211
using namespace NKikimr;

ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {
188188
auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", buildIndexId);
189189
UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE);
190190

191-
const TString meteringData = R"({"usage":{"start":0,"quantity":431,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"109-72075186233409549-2-0-0-0-0-619-605-11328-10960","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n";
191+
const TString meteringData = R"({"usage":{"start":0,"quantity":433,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"109-72075186233409549-2-0-0-0-0-611-609-11032-11108","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n";
192192

193193
UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData);
194194

ydb/core/tx/schemeshard/ut_vector_index_build_reboots/ut_vector_index_build_reboots.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@ Y_UNIT_TEST_SUITE(VectorIndexBuildTestReboots) {
2626
KeyColumnNames: ["key"]
2727
SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 50 } } } }
2828
SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 150 } } } }
29+
SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 250 } } } }
30+
SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 350 } } } }
2931
)");
3032
t.TestEnv->TestWaitNotification(runtime, t.TxId);
3133

3234
WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 0, 0, 50);
3335
WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 1, 50, 150);
34-
WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 2, 150, 200);
36+
WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 2, 150, 250);
37+
WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 3, 250, 350);
38+
WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 4, 350, 400);
3539
}
3640

3741
AsyncBuildVectorIndex(runtime, ++t.TxId, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/dir/Table", "index1", "embedding", {"value"});
@@ -68,6 +72,14 @@ Y_UNIT_TEST_SUITE(VectorIndexBuildTestReboots) {
6872
{NLs::PathNotExist});
6973
TestDescribeResult(DescribePath(runtime, indexPath + "/" + PostingTable + BuildSuffix1, true, true, true),
7074
{NLs::PathNotExist});
75+
76+
// Check row count in the posting table
77+
{
78+
auto rows = CountRows(runtime, TTestTxConfig::SchemeShard, "/MyRoot/dir/Table/index1/indexImplPostingTable");
79+
Cerr << "... posting table contains " << rows << " rows" << Endl;
80+
UNIT_ASSERT_VALUES_EQUAL(rows, 400);
81+
}
82+
7183
}
7284
});
7385
}

0 commit comments

Comments
 (0)