Skip to content

Commit 3c2e65a

Browse files
authored
kmeans tree vector index search (#12639)
1 parent ac47d76 commit 3c2e65a

21 files changed

+774
-158
lines changed

ydb/core/base/table_index.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,19 @@ bool Contains(const auto& names, std::string_view str) {
2828
return std::find(std::begin(names), std::end(names), str) != std::end(names);
2929
}
3030

31-
constexpr std::string_view ImplTables[] = {ImplTable, NTableVectorKmeansTreeIndex::LevelTable, NTableVectorKmeansTreeIndex::PostingTable};
31+
constexpr std::string_view ImplTables[] = {
32+
ImplTable, NTableVectorKmeansTreeIndex::LevelTable, NTableVectorKmeansTreeIndex::PostingTable,
33+
};
34+
35+
constexpr std::string_view GlobalSecondaryImplTables[] = {
36+
ImplTable,
37+
};
38+
static_assert(std::is_sorted(std::begin(GlobalSecondaryImplTables), std::end(GlobalSecondaryImplTables)));
39+
40+
constexpr std::string_view GlobalKMeansTreeImplTables[] = {
41+
NTableVectorKmeansTreeIndex::LevelTable, NTableVectorKmeansTreeIndex::PostingTable,
42+
};
43+
static_assert(std::is_sorted(std::begin(GlobalKMeansTreeImplTables), std::end(GlobalKMeansTreeImplTables)));
3244

3345
}
3446

@@ -142,11 +154,11 @@ bool IsCompatibleIndex(NKikimrSchemeOp::EIndexType indexType, const TTableColumn
142154
return true;
143155
}
144156

145-
TVector<TString> GetImplTables(NKikimrSchemeOp::EIndexType indexType) {
157+
std::span<const std::string_view> GetImplTables(NKikimrSchemeOp::EIndexType indexType) {
146158
if (indexType == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree) {
147-
return { NTableVectorKmeansTreeIndex::LevelTable, NTableVectorKmeansTreeIndex::PostingTable };
159+
return GlobalKMeansTreeImplTables;
148160
} else {
149-
return { ImplTable };
161+
return GlobalSecondaryImplTables;
150162
}
151163
}
152164

ydb/core/base/table_index.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
#include <util/generic/string.h>
88
#include <util/string/builder.h>
99

10+
#include <span>
11+
#include <string_view>
12+
1013
namespace NKikimr::NTableIndex {
1114

1215
struct TTableColumns {
@@ -24,7 +27,7 @@ inline constexpr const char* ImplTable = "indexImplTable";
2427
bool IsCompatibleIndex(NKikimrSchemeOp::EIndexType type, const TTableColumns& table, const TIndexColumns& index, TString& explain);
2528
TTableColumns CalcTableImplDescription(NKikimrSchemeOp::EIndexType type, const TTableColumns& table, const TIndexColumns& index);
2629

27-
TVector<TString> GetImplTables(NKikimrSchemeOp::EIndexType indexType);
30+
std::span<const std::string_view> GetImplTables(NKikimrSchemeOp::EIndexType indexType);
2831
bool IsImplTable(std::string_view tableName);
2932
bool IsBuildImplTable(std::string_view tableName);
3033

ydb/core/kqp/gateway/kqp_metadata_loader.cpp

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "actors/kqp_ic_gateway_actors.h"
33

44
#include <ydb/core/base/path.h>
5+
#include <ydb/core/base/table_index.h>
56
#include <ydb/core/external_sources/external_source_factory.h>
67
#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
78
#include <ydb/core/kqp/gateway/utils/scheme_helpers.h>
@@ -175,7 +176,7 @@ TTableMetadataResult GetTableMetadataResult(const NSchemeCache::TSchemeCacheNavi
175176
THashMap<TString, NYql::TKikimrPathId> sequences;
176177

177178
for (const auto& sequenceDesc : entry.Sequences) {
178-
sequences[sequenceDesc.GetName()] =
179+
sequences[sequenceDesc.GetName()] =
179180
NYql::TKikimrPathId(sequenceDesc.GetPathId().GetOwnerId(), sequenceDesc.GetPathId().GetLocalId());
180181
}
181182

@@ -187,7 +188,7 @@ TTableMetadataResult GetTableMetadataResult(const NSchemeCache::TSchemeCacheNavi
187188
const TString typeName = GetTypeName(NScheme::TTypeInfoMod{columnDesc.PType, columnDesc.PTypeMod});
188189
auto defaultKind = NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_UNSPECIFIED;
189190
NYql::TKikimrPathId defaultFromSequencePathId = {};
190-
191+
191192
if (columnDesc.IsDefaultFromSequence()) {
192193
defaultKind = NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_SEQUENCE;
193194
auto sequenceIt = sequences.find(columnDesc.DefaultFromSequence);
@@ -196,7 +197,7 @@ TTableMetadataResult GetTableMetadataResult(const NSchemeCache::TSchemeCacheNavi
196197
} else if (columnDesc.IsDefaultFromLiteral()) {
197198
defaultKind = NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_LITERAL;
198199
}
199-
200+
200201
tableMeta->Columns.emplace(
201202
columnDesc.Name,
202203
NYql::TKikimrColumnMetadata(
@@ -400,11 +401,15 @@ TString GetDebugString(const std::pair<NKikimr::TIndexId, TString>& id) {
400401
return TStringBuilder() << " Path: " << id.second << " TableId: " << id.first;
401402
}
402403

403-
void UpdateMetadataIfSuccess(NYql::TKikimrTableMetadataPtr ptr, size_t idx, const TTableMetadataResult& value) {
404-
if (value.Success()) {
405-
ptr->SecondaryGlobalIndexMetadata[idx] = value.Metadata;
404+
void UpdateMetadataIfSuccess(NYql::TKikimrTableMetadataPtr& implTable, TTableMetadataResult& value) {
405+
YQL_ENSURE(value.Success());
406+
if (!implTable) {
407+
implTable = std::move(value.Metadata);
408+
return;
406409
}
407-
410+
YQL_ENSURE(!implTable->Next);
411+
YQL_ENSURE(implTable->Name < value.Metadata->Name);
412+
implTable->Next = std::move(value.Metadata);
408413
}
409414

410415
void SetError(TTableMetadataResult& externalDataSourceMetadata, const TString& error) {
@@ -618,28 +623,21 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadIndexMeta
618623
const auto& tableName = tableMetadata->Name;
619624
const size_t indexesCount = tableMetadata->Indexes.size();
620625

621-
TVector<NThreading::TFuture<TGenericResult>> children;
626+
TVector<NThreading::TFuture<TTableMetadataResult>> children;
622627
children.reserve(indexesCount);
623628

624-
tableMetadata->SecondaryGlobalIndexMetadata.resize(indexesCount);
625629
const ui64 tableOwnerId = tableMetadata->PathId.OwnerId();
626630

627631
for (size_t i = 0; i < indexesCount; i++) {
628632
const auto& index = tableMetadata->Indexes[i];
629-
const auto indexTablePaths = NSchemeHelpers::CreateIndexTablePath(tableName, index.Type, index.Name);
630-
for (const auto& indexTablePath : indexTablePaths) {
633+
const auto implTablePaths = NSchemeHelpers::CreateIndexTablePath(tableName, index.Type, index.Name);
634+
for (const auto& implTablePath : implTablePaths) {
631635
if (!index.SchemaVersion) {
632636
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load index metadata without schema version check index: " << index.Name);
633637
children.push_back(
634-
LoadTableMetadata(cluster, indexTablePath,
638+
LoadTableMetadata(cluster, implTablePath,
635639
TLoadTableMetadataSettings().WithPrivateTables(true), database, userToken)
636-
.Apply([i, tableMetadata](const TFuture<TTableMetadataResult>& result) {
637-
auto value = result.GetValue();
638-
UpdateMetadataIfSuccess(tableMetadata, i, value);
639-
return static_cast<TGenericResult>(value);
640-
})
641640
);
642-
643641
} else {
644642
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load index metadata with schema version check"
645643
<< "index: " << index.Name
@@ -650,12 +648,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadIndexMeta
650648
auto ownerId = index.PathOwnerId ? index.PathOwnerId : tableOwnerId; //for compat with 20-2
651649
children.push_back(
652650
LoadIndexMetadataByPathId(cluster,
653-
NKikimr::TIndexId(ownerId, index.LocalPathId, index.SchemaVersion), indexTablePath, database, userToken)
654-
.Apply([i, tableMetadata](const TFuture<TTableMetadataResult>& result) {
655-
auto value = result.GetValue();
656-
UpdateMetadataIfSuccess(tableMetadata, i, value);
657-
return static_cast<TGenericResult>(value);
658-
})
651+
NKikimr::TIndexId(ownerId, index.LocalPathId, index.SchemaVersion), implTablePath, database, userToken)
659652
);
660653

661654
}
@@ -666,14 +659,26 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadIndexMeta
666659
auto loadIndexMetadataChecker =
667660
[ptr, result{std::move(loadTableMetadataResult)}, children](const NThreading::TFuture<void>) mutable {
668661
bool loadOk = true;
669-
for (const auto& child : children) {
670-
result.AddIssues(child.GetValue().Issues());
671-
if (!child.GetValue().Success()) {
672-
loadOk = false;
662+
663+
const auto indexesCount = result.Metadata->Indexes.size();
664+
result.Metadata->ImplTables.resize(indexesCount);
665+
auto it = children.begin();
666+
for (size_t i = 0; i < indexesCount; i++) {
667+
for (const auto& _ : NTableIndex::GetImplTables(NYql::TIndexDescription::ConvertIndexType(
668+
result.Metadata->Indexes[i].Type))) {
669+
YQL_ENSURE(it != children.end());
670+
auto value = it++->ExtractValue();
671+
result.AddIssues(value.Issues());
672+
if (loadOk && (loadOk = value.Success())) {
673+
UpdateMetadataIfSuccess(result.Metadata->ImplTables[i], value);
674+
}
673675
}
674676
}
677+
YQL_ENSURE(it == children.end());
678+
675679
auto locked = ptr.lock();
676680
if (!loadOk || !locked) {
681+
result.Metadata->ImplTables.clear();
677682
result.SetStatus(TIssuesIds::KIKIMR_INDEX_METADATA_LOAD_FAILED);
678683
} else {
679684
locked->OnLoadedTableMetadata(result);
@@ -909,13 +914,17 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
909914
case EKind::KindIndex: {
910915
Y_ENSURE(entry.ListNodeEntry, "expected children list");
911916
for (const auto& child : entry.ListNodeEntry->Children) {
917+
if (!table.EndsWith(child.Name)) {
918+
continue;
919+
}
912920
TIndexId pathId = TIndexId(child.PathId, child.SchemaVersion);
913921

914922
LoadTableMetadataCache(cluster, std::make_pair(pathId, table), settings, database, userToken)
915923
.Apply([promise](const TFuture<TTableMetadataResult>& result) mutable
916924
{
917925
promise.SetValue(result.GetValue());
918926
});
927+
break;
919928
}
920929
break;
921930
}

ydb/core/kqp/host/kqp_type_ann.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ TStatus AnnotateReadTable(const TExprNode::TPtr& node, TExprContext& ctx, const
246246
TKikimrTableMetadataPtr meta;
247247

248248
if (readIndex) {
249-
meta = table.second->Metadata->GetIndexMetadata(TString(node->Child(TKqlReadTableIndex::idx_Index)->Content())).first;
249+
meta = table.second->Metadata->GetIndexMetadata(node->Child(TKqlReadTableIndex::idx_Index)->Content()).first;
250250
if (!meta) {
251251
return TStatus::Error;
252252
}
@@ -455,7 +455,7 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
455455
if (isStreamLookup && !EnsureArgsCount(*node, TKqlStreamLookupIndex::Match(node.Get()) ? 5 : 4, ctx)) {
456456
return TStatus::Error;
457457
}
458-
458+
459459
if (!isStreamLookup && !EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) ? 4 : 3, ctx)) {
460460
return TStatus::Error;
461461
}
@@ -560,7 +560,7 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
560560
if (!EnsureAtom(*index, ctx)) {
561561
return TStatus::Error;
562562
}
563-
auto indexMeta = table.second->Metadata->GetIndexMetadata(TString(index->Content())).first;
563+
auto indexMeta = table.second->Metadata->GetIndexMetadata(index->Content()).first;
564564

565565
if (!CalcKeyColumnsCount(ctx, node->Pos(), *structType, *table.second, *indexMeta, keyColumnsCount)) {
566566
return TStatus::Error;
@@ -713,7 +713,7 @@ TStatus AnnotateUpsertRows(const TExprNode::TPtr& node, TExprContext& ctx, const
713713
}
714714

715715
if (TKqlUpsertRowsIndex::Match(node.Get())) {
716-
Y_ENSURE(!table.second->Metadata->SecondaryGlobalIndexMetadata.empty());
716+
Y_ENSURE(!table.second->Metadata->ImplTables.empty());
717717
}
718718

719719
auto effectType = MakeKqpEffectType(ctx);
@@ -1683,7 +1683,7 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
16831683

16841684
} else if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows
16851685
|| settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) {
1686-
1686+
16871687
if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
16881688
return TStatus::Error;
16891689
}
@@ -1820,7 +1820,7 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
18201820
} else {
18211821
node->SetTypeAnn(ctx.MakeType<TListExprType>(outputRowType));
18221822
}
1823-
1823+
18241824
return TStatus::Ok;
18251825
}
18261826

ydb/core/kqp/opt/kqp_opt_kql.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ std::pair<TExprBase, TCoAtomList> CreateRowsToReplace(const TExprBase& input,
7979

8080
bool HasIndexesToWrite(const TKikimrTableDescription& tableData) {
8181
bool hasIndexesToWrite = false;
82-
YQL_ENSURE(tableData.Metadata->Indexes.size() == tableData.Metadata->SecondaryGlobalIndexMetadata.size());
82+
YQL_ENSURE(tableData.Metadata->Indexes.size() == tableData.Metadata->ImplTables.size());
8383
for (const auto& index : tableData.Metadata->Indexes) {
8484
if (index.ItUsedForWrite()) {
8585
hasIndexesToWrite = true;
@@ -893,7 +893,7 @@ TIntrusivePtr<TKikimrTableMetadata> GetIndexMetadata(const TKqlReadTableIndex& r
893893
const TKikimrTablesData& tables, TStringBuf cluster)
894894
{
895895
const auto& tableDesc = GetTableData(tables, cluster, read.Table().Path());
896-
const auto& [indexMeta, _ ] = tableDesc.Metadata->GetIndexMetadata(read.Index().StringValue());
896+
const auto& [indexMeta, _ ] = tableDesc.Metadata->GetIndexMetadata(read.Index().Value());
897897
return indexMeta;
898898
}
899899

0 commit comments

Comments
 (0)