Skip to content

Commit 27039c5

Browse files
committed
Call FillLocalKMeans for prefixed index explicitly (#17321)
1 parent f63894c commit 27039c5

File tree

3 files changed

+187
-54
lines changed

3 files changed

+187
-54
lines changed

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3104,6 +3104,98 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
31043104
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
31053105
}
31063106

3107+
Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineDistanceNotNullableLevel3) {
3108+
NKikimrConfig::TFeatureFlags featureFlags;
3109+
featureFlags.SetEnableVectorIndex(true);
3110+
auto setting = NKikimrKqp::TKqpSetting();
3111+
auto serverSettings = TKikimrSettings()
3112+
.SetFeatureFlags(featureFlags)
3113+
.SetKqpSettings({setting});
3114+
3115+
TKikimrRunner kikimr(serverSettings);
3116+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE);
3117+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
3118+
3119+
auto db = kikimr.GetTableClient();
3120+
auto session = DoCreateTableForPrefixedVectorIndex(db, false);
3121+
{
3122+
const TString createIndex(Q_(R"(
3123+
ALTER TABLE `/Root/TestTable`
3124+
ADD INDEX index
3125+
GLOBAL USING vector_kmeans_tree
3126+
ON (user, emb)
3127+
WITH (distance=cosine, vector_type="uint8", vector_dimension=2, levels=3, clusters=2);
3128+
)"));
3129+
3130+
auto result = session.ExecuteSchemeQuery(createIndex)
3131+
.ExtractValueSync();
3132+
3133+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3134+
}
3135+
{
3136+
auto result = session.DescribeTable("/Root/TestTable").ExtractValueSync();
3137+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
3138+
const auto& indexes = result.GetTableDescription().GetIndexDescriptions();
3139+
UNIT_ASSERT_EQUAL(indexes.size(), 1);
3140+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexName(), "index");
3141+
std::vector<std::string> indexKeyColumns{"user", "emb"};
3142+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexColumns(), indexKeyColumns);
3143+
const auto& settings = std::get<TKMeansTreeSettings>(indexes[0].GetIndexSettings());
3144+
UNIT_ASSERT_EQUAL(settings.Settings.Metric, NYdb::NTable::TVectorIndexSettings::EMetric::CosineDistance);
3145+
UNIT_ASSERT_EQUAL(settings.Settings.VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Uint8);
3146+
UNIT_ASSERT_EQUAL(settings.Settings.VectorDimension, 2);
3147+
UNIT_ASSERT_EQUAL(settings.Levels, 3);
3148+
UNIT_ASSERT_EQUAL(settings.Clusters, 2);
3149+
}
3150+
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
3151+
}
3152+
3153+
Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineDistanceNotNullableLevel4) {
3154+
NKikimrConfig::TFeatureFlags featureFlags;
3155+
featureFlags.SetEnableVectorIndex(true);
3156+
auto setting = NKikimrKqp::TKqpSetting();
3157+
auto serverSettings = TKikimrSettings()
3158+
.SetFeatureFlags(featureFlags)
3159+
.SetKqpSettings({setting});
3160+
3161+
TKikimrRunner kikimr(serverSettings);
3162+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE);
3163+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
3164+
3165+
auto db = kikimr.GetTableClient();
3166+
auto session = DoCreateTableForPrefixedVectorIndex(db, false);
3167+
{
3168+
const TString createIndex(Q_(R"(
3169+
ALTER TABLE `/Root/TestTable`
3170+
ADD INDEX index
3171+
GLOBAL USING vector_kmeans_tree
3172+
ON (user, emb)
3173+
WITH (distance=cosine, vector_type="uint8", vector_dimension=2, levels=4, clusters=2);
3174+
)"));
3175+
3176+
auto result = session.ExecuteSchemeQuery(createIndex)
3177+
.ExtractValueSync();
3178+
3179+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3180+
}
3181+
{
3182+
auto result = session.DescribeTable("/Root/TestTable").ExtractValueSync();
3183+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
3184+
const auto& indexes = result.GetTableDescription().GetIndexDescriptions();
3185+
UNIT_ASSERT_EQUAL(indexes.size(), 1);
3186+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexName(), "index");
3187+
std::vector<std::string> indexKeyColumns{"user", "emb"};
3188+
UNIT_ASSERT_EQUAL(indexes[0].GetIndexColumns(), indexKeyColumns);
3189+
const auto& settings = std::get<TKMeansTreeSettings>(indexes[0].GetIndexSettings());
3190+
UNIT_ASSERT_EQUAL(settings.Settings.Metric, NYdb::NTable::TVectorIndexSettings::EMetric::CosineDistance);
3191+
UNIT_ASSERT_EQUAL(settings.Settings.VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Uint8);
3192+
UNIT_ASSERT_EQUAL(settings.Settings.VectorDimension, 2);
3193+
UNIT_ASSERT_EQUAL(settings.Levels, 4);
3194+
UNIT_ASSERT_EQUAL(settings.Clusters, 2);
3195+
}
3196+
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
3197+
}
3198+
31073199
Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineSimilarityNotNullableLevel2) {
31083200
NKikimrConfig::TFeatureFlags featureFlags;
31093201
featureFlags.SetEnableVectorIndex(true);

ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp

Lines changed: 71 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateBuildPropose(
370370
static constexpr std::string_view LogPrefix = "Create build table boundaries for ";
371371
LOG_D(buildInfo.Id << " table " << suffix
372372
<< ", count: " << count << ", parts: " << parts << ", step: " << step
373-
<< ", kmeans: " << buildInfo.KMeansTreeToDebugStr());
373+
<< ", " << buildInfo.DebugString());
374374
if (parts > 1) {
375375
const auto from = buildInfo.KMeans.ChildBegin;
376376
for (auto i = from + step, e = from + count; i < e; i += step) {
@@ -699,7 +699,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
699699
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
700700
}
701701

702-
void SendBuildIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
702+
void SendBuildSecondaryIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
703703
auto ev = MakeHolder<TEvDataShard::TEvBuildIndexCreateRequest>();
704704
ev->Record.SetBuildIndexId(ui64(BuildId));
705705

@@ -808,12 +808,20 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
808808
}
809809
}
810810

811-
bool FillTable(TIndexBuildInfo& buildInfo) {
811+
bool FillSecondaryIndex(TIndexBuildInfo& buildInfo) {
812+
LOG_D("FillSecondaryIndex Start");
813+
812814
if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty()) {
813815
AddAllShards(buildInfo);
814816
}
815-
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendBuildIndexRequest(shardIdx, buildInfo); }) &&
817+
auto done = SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendBuildSecondaryIndexRequest(shardIdx, buildInfo); }) &&
816818
buildInfo.DoneShards.size() == buildInfo.Shards.size();
819+
820+
if (done) {
821+
LOG_D("FillSecondaryIndex Done");
822+
}
823+
824+
return done;
817825
}
818826

819827
bool FillPrefixKMeans(TIndexBuildInfo& buildInfo) {
@@ -824,6 +832,14 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
824832
buildInfo.DoneShards.size() == buildInfo.Shards.size();
825833
}
826834

835+
bool FillLocalKMeans(TIndexBuildInfo& buildInfo) {
836+
if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty()) {
837+
AddAllShards(buildInfo);
838+
}
839+
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendKMeansLocalRequest(shardIdx, buildInfo); }) &&
840+
buildInfo.DoneShards.size() == buildInfo.Shards.size();
841+
}
842+
827843
bool InitSingleKMeans(TIndexBuildInfo& buildInfo) {
828844
if (!buildInfo.DoneShards.empty() || !buildInfo.InProgressShards.empty() || !buildInfo.ToUploadShards.empty()) {
829845
return false;
@@ -935,102 +951,112 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
935951
);
936952
}
937953

938-
bool FillVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
939-
if (buildInfo.IsBuildPrefixedVectorIndex() && buildInfo.KMeans.Level == 1) {
940-
LOG_D("FillIndex::Prefixed::Level1::Start " << buildInfo.KMeansTreeToDebugStr());
941-
if (!FillTable(buildInfo)) {
954+
bool FillPrefixedVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
955+
LOG_D("FillPrefixedVectorIndex Start " << buildInfo.DebugString());
956+
957+
if (buildInfo.KMeans.Level == 1) {
958+
if (!FillSecondaryIndex(buildInfo)) {
942959
return false;
943960
}
944-
const ui64 doneShards = buildInfo.DoneShards.size();
961+
LOG_D("FillPrefixedVectorIndex DoneLevel " << buildInfo.DebugString());
945962

963+
const ui64 doneShards = buildInfo.DoneShards.size();
946964
ClearDoneShards(txc, buildInfo);
947965
// it's approximate but upper bound, so it's ok
948966
buildInfo.KMeans.TableSize = std::max<ui64>(1, buildInfo.Processed.GetUploadRows());
949967
buildInfo.KMeans.PrefixIndexDone(doneShards);
968+
LOG_D("FillPrefixedVectorIndex PrefixIndexDone " << buildInfo.DebugString());
969+
950970
PersistKMeansState(txc, buildInfo);
951971
NIceDb::TNiceDb db{txc.DB};
952972
Self->PersistBuildIndexUploadReset(db, buildInfo);
953-
LOG_D("FillIndex::Prefixed::Level1::Done " << buildInfo.KMeansTreeToDebugStr());
954973
ChangeState(BuildId, TIndexBuildInfo::EState::CreateBuild);
955974
Progress(BuildId);
956975
return false;
957-
}
958-
959-
if (buildInfo.IsBuildPrefixedVectorIndex() && buildInfo.KMeans.Level == 2) {
960-
LOG_D("FillIndex::Prefixed::Level2::Start " << buildInfo.KMeansTreeToDebugStr());
961-
if (!FillPrefixKMeans(buildInfo)) {
976+
} else {
977+
bool filled = buildInfo.KMeans.Level == 2
978+
? FillPrefixKMeans(buildInfo)
979+
: FillLocalKMeans(buildInfo);
980+
if (!filled) {
962981
return false;
963982
}
983+
LOG_D("FillPrefixedVectorIndex DoneLevel " << buildInfo.DebugString());
964984

965985
ClearDoneShards(txc, buildInfo);
966986
Y_ASSERT(buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::MultiLocal);
967987
const bool needsAnotherLevel = buildInfo.KMeans.NextLevel();
968988
buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::MultiLocal;
969-
buildInfo.KMeans.Parent = buildInfo.KMeans.ParentEnd();
989+
if (buildInfo.KMeans.Level == 2) {
990+
buildInfo.KMeans.Parent = buildInfo.KMeans.ParentEnd();
991+
}
992+
LOG_D("FillPrefixedVectorIndex NextLevel " << buildInfo.DebugString());
993+
970994
PersistKMeansState(txc, buildInfo);
971995
NIceDb::TNiceDb db{txc.DB};
972996
Self->PersistBuildIndexUploadReset(db, buildInfo);
973-
LOG_D("FillIndex::Prefixed::Level2::Done " << buildInfo.KMeansTreeToDebugStr());
974997
if (!needsAnotherLevel) {
998+
LOG_D("FillPrefixedVectorIndex Done " << buildInfo.DebugString());
975999
return true;
9761000
}
9771001
ChangeState(BuildId, TIndexBuildInfo::EState::DropBuild);
9781002
Progress(BuildId);
9791003
return false;
9801004
}
1005+
}
1006+
1007+
bool FillVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
1008+
LOG_D("FillVectorIndex Start " << buildInfo.DebugString());
9811009

9821010
if (buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Upload) {
9831011
return false;
9841012
}
9851013
if (InitSingleKMeans(buildInfo)) {
986-
LOG_D("FillIndex::SingleKMeans::Start " << buildInfo.KMeansTreeToDebugStr());
1014+
LOG_D("FillVectorIndex SingleKMeans " << buildInfo.DebugString());
9871015
}
9881016
if (!SendVectorIndex(buildInfo)) {
9891017
return false;
9901018
}
9911019

992-
LOG_D("FillIndex::SendVectorIndex::Done " << buildInfo.KMeansTreeToDebugStr());
9931020
if (!buildInfo.Sample.Rows.empty()) {
9941021
if (buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Collect) {
995-
LOG_D("FillIndex::SendSample::Start " << buildInfo.KMeansTreeToDebugStr());
1022+
LOG_D("FillVectorIndex SendUploadSampleKRequest " << buildInfo.DebugString());
9961023
SendUploadSampleKRequest(buildInfo);
9971024
return false;
9981025
}
999-
LOG_D("FillIndex::SendSample::Done " << buildInfo.KMeansTreeToDebugStr());
10001026
}
10011027

1002-
LOG_D("FillIndex::ClearDoneShards " << buildInfo.KMeansTreeToDebugStr());
1028+
LOG_D("FillVectorIndex DoneLevel " << buildInfo.DebugString());
10031029
ClearDoneShards(txc, buildInfo);
10041030

10051031
if (!buildInfo.Sample.Rows.empty()) {
10061032
if (buildInfo.KMeans.NextState()) {
1007-
LOG_D("FillIndex::NextState::Start " << buildInfo.KMeansTreeToDebugStr());
1033+
LOG_D("FillVectorIndex NextState " << buildInfo.DebugString());
10081034
PersistKMeansState(txc, buildInfo);
10091035
Progress(BuildId);
10101036
return false;
10111037
}
10121038
buildInfo.Sample.Clear();
10131039
NIceDb::TNiceDb db{txc.DB};
10141040
Self->PersistBuildIndexSampleForget(db, buildInfo);
1015-
LOG_D("FillIndex::NextState::Done " << buildInfo.KMeansTreeToDebugStr());
1041+
LOG_D("FillVectorIndex DoneState " << buildInfo.DebugString());
10161042
}
10171043

10181044
if (buildInfo.KMeans.NextParent()) {
1019-
LOG_D("FillIndex::NextParent::Start " << buildInfo.KMeansTreeToDebugStr());
1045+
LOG_D("FillVectorIndex NextParent " << buildInfo.DebugString());
10201046
PersistKMeansState(txc, buildInfo);
10211047
Progress(BuildId);
10221048
return false;
10231049
}
10241050

10251051
if (InitMultiKMeans(buildInfo)) {
1026-
LOG_D("FillIndex::MultiKMeans::Start " << buildInfo.KMeansTreeToDebugStr());
1052+
LOG_D("FillVectorIndex MultiKMeans " << buildInfo.DebugString());
10271053
PersistKMeansState(txc, buildInfo);
10281054
Progress(BuildId);
10291055
return false;
10301056
}
10311057

10321058
if (buildInfo.KMeans.NextLevel()) {
1033-
LOG_D("FillIndex::NextLevel::Start " << buildInfo.KMeansTreeToDebugStr());
1059+
LOG_D("FillVectorIndex NextLevel " << buildInfo.DebugString());
10341060
PersistKMeansState(txc, buildInfo);
10351061
NIceDb::TNiceDb db{txc.DB};
10361062
Self->PersistBuildIndexUploadReset(db, buildInfo);
@@ -1040,7 +1066,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
10401066
Progress(BuildId);
10411067
return false;
10421068
}
1043-
LOG_D("FillIndex::Done " << buildInfo.KMeansTreeToDebugStr());
1069+
LOG_D("FillVectorIndex Done " << buildInfo.DebugString());
10441070
return true;
10451071
}
10461072

@@ -1058,15 +1084,20 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
10581084
Y_ABORT_UNLESS(buildInfo.SnapshotStep);
10591085
}
10601086
if (buildInfo.Shards.empty()) {
1061-
LOG_D("FillIndex::InitiateShards " << buildInfo.KMeansTreeToDebugStr());
10621087
NIceDb::TNiceDb db(txc.DB);
10631088
InitiateShards(db, buildInfo);
10641089
}
1065-
if (buildInfo.IsBuildVectorIndex()) {
1066-
return FillVectorIndex(txc, buildInfo);
1067-
} else {
1068-
Y_ASSERT(buildInfo.IsBuildSecondaryIndex() || buildInfo.IsBuildColumns());
1069-
return FillTable(buildInfo);
1090+
switch (buildInfo.BuildKind) {
1091+
case TIndexBuildInfo::EBuildKind::BuildSecondaryIndex:
1092+
case TIndexBuildInfo::EBuildKind::BuildColumns:
1093+
return FillSecondaryIndex(buildInfo);
1094+
case TIndexBuildInfo::EBuildKind::BuildVectorIndex:
1095+
return FillVectorIndex(txc, buildInfo);
1096+
case TIndexBuildInfo::EBuildKind::BuildPrefixedVectorIndex:
1097+
return FillPrefixedVectorIndex(txc, buildInfo);
1098+
default:
1099+
Y_ASSERT(false);
1100+
return true;
10701101
}
10711102
}
10721103

@@ -1303,6 +1334,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
13031334
}
13041335

13051336
void InitiateShards(NIceDb::TNiceDb& db, TIndexBuildInfo& buildInfo) {
1337+
LOG_D("InitiateShards " << buildInfo.DebugString());
1338+
13061339
Y_ASSERT(buildInfo.Shards.empty());
13071340
Y_ASSERT(buildInfo.ToUploadShards.empty());
13081341
Y_ASSERT(buildInfo.InProgressShards.empty());
@@ -1318,15 +1351,16 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
13181351
auto tableColumns = NTableIndex::ExtractInfo(table); // skip dropped columns
13191352
TSerializedTableRange shardRange = InfiniteRange(tableColumns.Keys.size());
13201353
static constexpr std::string_view LogPrefix = "";
1321-
LOG_D("infinite range " << buildInfo.KMeans.RangeToDebugStr(shardRange, buildInfo.IsBuildPrefixedVectorIndex() ? 2 : 1));
13221354

13231355
buildInfo.Cluster2Shards.clear();
13241356
for (const auto& x: table->GetPartitions()) {
13251357
Y_ABORT_UNLESS(Self->ShardInfos.contains(x.ShardIdx));
13261358
TSerializedCellVec bound{x.EndOfRange};
13271359
shardRange.To = bound;
1328-
LOG_D("shard " << x.ShardIdx << " range " << buildInfo.KMeans.RangeToDebugStr(shardRange, buildInfo.IsBuildPrefixedVectorIndex() ? 2 : 1));
1329-
buildInfo.AddParent(shardRange, x.ShardIdx);
1360+
if (buildInfo.BuildKind == TIndexBuildInfo::EBuildKind::BuildVectorIndex) {
1361+
LOG_D("shard " << x.ShardIdx << " range " << buildInfo.KMeans.RangeToDebugStr(shardRange));
1362+
buildInfo.AddParent(shardRange, x.ShardIdx);
1363+
}
13301364
auto [it, emplaced] = buildInfo.Shards.emplace(x.ShardIdx, TIndexBuildInfo::TShardStatus{std::move(shardRange), "", buildInfo.Shards.size()});
13311365
Y_ASSERT(emplaced);
13321366
shardRange.From = std::move(bound);

0 commit comments

Comments
 (0)