Skip to content

Commit f32ba67

Browse files
committed
Fix duplicate rows in prefixed vector index when shard indexes are out of order, add a reboot test (#19750)
1 parent 7d896ef commit f32ba67

File tree

10 files changed

+327
-135
lines changed

10 files changed

+327
-135
lines changed

ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -348,9 +348,9 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateBuildPropose(
348348
op.SetName(TString::Join(PostingTable, suffix));
349349
NTableIndex::FillIndexTableColumns(tableInfo->Columns, implTableColumns.Keys, implTableColumns.Columns, op);
350350
auto& policy = *resetPartitionsSettings();
351-
const auto shards = tableInfo->GetShard2PartitionIdx().size();
352-
policy.SetMinPartitionsCount(shards);
353-
policy.SetMaxPartitionsCount(shards);
351+
// Prevent merging partitions
352+
policy.SetMinPartitionsCount(32768);
353+
policy.SetMaxPartitionsCount(0);
354354

355355
LOG_DEBUG_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX,
356356
"CreateBuildPropose " << buildInfo.Id << " " << buildInfo.State << " " << propose->Record.ShortDebugString());
@@ -867,6 +867,10 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
867867
if (NoShardsAdded(buildInfo)) {
868868
AddAllShards(buildInfo);
869869
}
870+
size_t i = 0;
871+
for (auto& [shardIdx, shardStatus]: buildInfo.Shards) {
872+
shardStatus.Index = i++;
873+
}
870874
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendPrefixKMeansRequest(shardIdx, buildInfo); }) &&
871875
buildInfo.DoneShards.size() == buildInfo.Shards.size();
872876
}
@@ -1428,7 +1432,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
14281432
LOG_D("shard " << x.ShardIdx << " range " << buildInfo.KMeans.RangeToDebugStr(shardRange));
14291433
buildInfo.AddParent(shardRange, x.ShardIdx);
14301434
}
1431-
auto [it, emplaced] = buildInfo.Shards.emplace(x.ShardIdx, TIndexBuildInfo::TShardStatus{std::move(shardRange), "", buildInfo.Shards.size()});
1435+
auto [it, emplaced] = buildInfo.Shards.emplace(x.ShardIdx, TIndexBuildInfo::TShardStatus{std::move(shardRange), ""});
14321436
Y_ENSURE(emplaced);
14331437
shardRange.From = std::move(bound);
14341438

ydb/core/tx/schemeshard/schemeshard_info_types.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2172,10 +2172,9 @@ void TImportInfo::AddNotifySubscriber(const TActorId &actorId) {
21722172
Subscribers.insert(actorId);
21732173
}
21742174

2175-
TIndexBuildInfo::TShardStatus::TShardStatus(TSerializedTableRange range, TString lastKeyAck, size_t shardsCount)
2175+
TIndexBuildInfo::TShardStatus::TShardStatus(TSerializedTableRange range, TString lastKeyAck)
21762176
: Range(std::move(range))
21772177
, LastKeyAck(std::move(lastKeyAck))
2178-
, Index(shardsCount)
21792178
{}
21802179

21812180
void TIndexBuildInfo::SerializeToProto(TSchemeShard* ss, NKikimrSchemeOp::TIndexBuildConfig* result) const {

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3262,7 +3262,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
32623262
TSerializedTableRange Range;
32633263
TString LastKeyAck;
32643264
ui64 SeqNoRound = 0;
3265-
size_t Index = 0; // size of Shards map before this element was added
3265+
size_t Index = 0; // used only in prefixed vector index: a unique number of shard in the list
32663266

32673267
NKikimrIndexBuilder::EBuildStatus Status = NKikimrIndexBuilder::EBuildStatus::INVALID;
32683268

@@ -3271,7 +3271,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
32713271

32723272
TBillingStats Processed;
32733273

3274-
TShardStatus(TSerializedTableRange range, TString lastKeyAck, size_t shardsCount);
3274+
TShardStatus(TSerializedTableRange range, TString lastKeyAck);
32753275

32763276
TString ToString(TShardIdx shardIdx = InvalidShardIdx) const {
32773277
TStringBuilder result;
@@ -3615,7 +3615,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
36153615
AddParent(bound, shardIdx);
36163616
}
36173617
Shards.emplace(
3618-
shardIdx, TIndexBuildInfo::TShardStatus(std::move(bound), std::move(lastKeyAck), Shards.size()));
3618+
shardIdx, TIndexBuildInfo::TShardStatus(std::move(bound), std::move(lastKeyAck)));
36193619
TIndexBuildInfo::TShardStatus &shardStatus = Shards.at(shardIdx);
36203620

36213621
shardStatus.Status =

ydb/core/tx/schemeshard/ut_helpers/helpers.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1777,10 +1777,10 @@ namespace NSchemeShardUT_Private {
17771777
}
17781778

17791779
void AsyncBuildVectorIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName,
1780-
const TString &src, const TString &name, TString column, TVector<TString> dataColumns)
1780+
const TString &src, const TString &name, TVector<TString> columns, TVector<TString> dataColumns)
17811781
{
17821782
AsyncBuildIndex(runtime, id, schemeShard, dbName, src, TBuildIndexConfig{
1783-
name, NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree, {column}, std::move(dataColumns)
1783+
name, NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree, columns, std::move(dataColumns)
17841784
});
17851785
}
17861786

@@ -2647,7 +2647,8 @@ namespace NSchemeShardUT_Private {
26472647
return CountRows(runtime, TTestTxConfig::SchemeShard, table);
26482648
}
26492649

2650-
void WriteVectorTableRows(TTestActorRuntime& runtime, ui64 schemeShardId, ui64 txId, const TString & tablePath, bool withValue, ui32 shard, ui32 min, ui32 max) {
2650+
void WriteVectorTableRows(TTestActorRuntime& runtime, ui64 schemeShardId, ui64 txId, const TString & tablePath,
2651+
ui32 shard, ui32 min, ui32 max, std::vector<ui32> columnIds) {
26512652
TVector<TCell> cells;
26522653
ui8 str[6] = { 0 };
26532654
str[4] = (ui8)Ydb::Table::VectorIndexSettings::VECTOR_TYPE_UINT8;
@@ -2658,16 +2659,15 @@ namespace NSchemeShardUT_Private {
26582659
str[3] = ((key+106)*47) % 256;
26592660
cells.emplace_back(TCell::Make(key));
26602661
cells.emplace_back(TCell((const char*)str, 5));
2661-
if (withValue) {
2662-
// optionally use the same value for an additional covered string column
2663-
cells.emplace_back(TCell((const char*)str, 5));
2664-
}
2662+
// optional prefix ui32 column
2663+
cells.emplace_back(TCell::Make(key % 17));
2664+
// optionally use the same value for an additional covered string column
2665+
cells.emplace_back(TCell((const char*)str, 5));
26652666
}
2666-
std::vector<ui32> columnIds{1, 2};
2667-
if (withValue) {
2668-
columnIds.push_back(3);
2667+
if (!columnIds.size()) {
2668+
columnIds = {1, 2, 3, 4};
26692669
}
2670-
TSerializedCellMatrix matrix(cells, max-min, withValue ? 3 : 2);
2670+
TSerializedCellMatrix matrix(cells, max-min, columnIds.size());
26712671
WriteOp(runtime, schemeShardId, txId, tablePath,
26722672
shard, NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
26732673
columnIds, std::move(matrix), true);

ydb/core/tx/schemeshard/ut_helpers/helpers.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ namespace NSchemeShardUT_Private {
392392
void AsyncBuildColumn(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TString& columnName, const Ydb::TypedValue& literal);
393393
void AsyncBuildIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TBuildIndexConfig &cfg);
394394
void AsyncBuildIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TString &name, TVector<TString> columns, TVector<TString> dataColumns = {});
395-
void AsyncBuildVectorIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TString &name, TString column, TVector<TString> dataColumns = {});
395+
void AsyncBuildVectorIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TString &name, TVector<TString> columns, TVector<TString> dataColumns = {});
396396
void TestBuildColumn(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName,
397397
const TString &src, const TString& columnName, const Ydb::TypedValue& literal, Ydb::StatusIds::StatusCode expectedStatus);
398398
void TestBuildIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TBuildIndexConfig &cfg, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);
@@ -657,8 +657,8 @@ namespace NSchemeShardUT_Private {
657657
ui32 CountRows(TTestActorRuntime& runtime, const TString& table);
658658

659659
void WriteVectorTableRows(TTestActorRuntime& runtime, ui64 schemeShardId, ui64 txId, const TString & tablePath,
660-
bool withValue, ui32 shard, ui32 min, ui32 max);
661-
660+
ui32 shard, ui32 min, ui32 max, std::vector<ui32> columnIds = {});
661+
662662
void TestCreateServerLessDb(TTestActorRuntime& runtime, TTestEnv& env, ui64& txId, ui64& tenantSchemeShard);
663663

664664
} //NSchemeShardUT_Private

ydb/core/tx/schemeshard/ut_helpers/test_env.cpp

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,22 @@ NSchemeShardUT_Private::TTestWithReboots::TTestWithReboots(bool killOnCommit, NS
10421042
TabletIds.push_back(datashard+6);
10431043
TabletIds.push_back(datashard+7);
10441044
TabletIds.push_back(datashard+8);
1045+
1046+
NoRebootEventTypes.insert(TEvSchemeShard::EvModifySchemeTransaction);
1047+
NoRebootEventTypes.insert(TEvSchemeShard::EvDescribeScheme);
1048+
NoRebootEventTypes.insert(TEvSchemeShard::EvNotifyTxCompletion);
1049+
NoRebootEventTypes.insert(TEvSchemeShard::EvMeasureSelfResponseTime);
1050+
NoRebootEventTypes.insert(TEvSchemeShard::EvWakeupToMeasureSelfResponseTime);
1051+
NoRebootEventTypes.insert(TEvTablet::EvLocalMKQL);
1052+
NoRebootEventTypes.insert(TEvFakeHive::EvSubscribeToTabletDeletion);
1053+
NoRebootEventTypes.insert(TEvSchemeShard::EvCancelTx);
1054+
NoRebootEventTypes.insert(TEvExport::EvCreateExportRequest);
1055+
NoRebootEventTypes.insert(TEvIndexBuilder::EvCreateRequest);
1056+
NoRebootEventTypes.insert(TEvIndexBuilder::EvGetRequest);
1057+
NoRebootEventTypes.insert(TEvIndexBuilder::EvCancelRequest);
1058+
NoRebootEventTypes.insert(TEvIndexBuilder::EvForgetRequest);
1059+
// without it, ut_vector_index_build_reboots test hangs on GetRequest on the very first reboot
1060+
NoRebootEventTypes.insert(TEvTablet::EvCommitResult);
10451061
}
10461062

10471063
void NSchemeShardUT_Private::TTestWithReboots::Run(std::function<void (TTestActorRuntime &, bool &)> testScenario) {
@@ -1076,9 +1092,10 @@ struct NSchemeShardUT_Private::TTestWithReboots::TFinalizer {
10761092
};
10771093

10781094
void NSchemeShardUT_Private::TTestWithReboots::RunWithTabletReboots(std::function<void (TTestActorRuntime &, bool &)> testScenario) {
1079-
RunTestWithReboots(TabletIds,
1080-
[&]() {
1081-
return PassUserRequests;
1095+
RunTestWithReboots(TabletIds, [&]() {
1096+
return [this](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
1097+
return PassUserRequests(runtime, event);
1098+
};
10821099
},
10831100
[&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
10841101
TFinalizer finalizer(*this);
@@ -1092,7 +1109,9 @@ void NSchemeShardUT_Private::TTestWithReboots::RunWithTabletReboots(std::functio
10921109
void NSchemeShardUT_Private::TTestWithReboots::RunWithPipeResets(std::function<void (TTestActorRuntime &, bool &)> testScenario) {
10931110
RunTestWithPipeResets(TabletIds,
10941111
[&]() {
1095-
return PassUserRequests;
1112+
return [this](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
1113+
return PassUserRequests(runtime, event);
1114+
};
10961115
},
10971116
[&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
10981117
TFinalizer finalizer(*this);
@@ -1161,22 +1180,7 @@ void NSchemeShardUT_Private::TTestWithReboots::Finalize() {
11611180

11621181
bool NSchemeShardUT_Private::TTestWithReboots::PassUserRequests(TTestActorRuntimeBase &runtime, TAutoPtr<IEventHandle> &event) {
11631182
Y_UNUSED(runtime);
1164-
return event->Type == TEvSchemeShard::EvModifySchemeTransaction ||
1165-
event->Type == TEvSchemeShard::EvDescribeScheme ||
1166-
event->Type == TEvSchemeShard::EvNotifyTxCompletion ||
1167-
event->Type == TEvSchemeShard::EvMeasureSelfResponseTime ||
1168-
event->Type == TEvSchemeShard::EvWakeupToMeasureSelfResponseTime ||
1169-
event->Type == TEvTablet::EvLocalMKQL ||
1170-
event->Type == TEvFakeHive::EvSubscribeToTabletDeletion ||
1171-
event->Type == TEvSchemeShard::EvCancelTx ||
1172-
event->Type == TEvExport::EvCreateExportRequest ||
1173-
event->Type == TEvIndexBuilder::EvCreateRequest ||
1174-
event->Type == TEvIndexBuilder::EvGetRequest ||
1175-
event->Type == TEvIndexBuilder::EvCancelRequest ||
1176-
event->Type == TEvIndexBuilder::EvForgetRequest ||
1177-
// without it, ut_vector_index_build_reboots test hangs on GetRequest on the very first reboot
1178-
event->Type == TEvTablet::EvCommitResult
1179-
;
1183+
return NoRebootEventTypes.contains(event->Type);
11801184
}
11811185

11821186
NSchemeShardUT_Private::TTestEnvOptions& NSchemeShardUT_Private::TTestWithReboots::GetTestEnvOptions() {

ydb/core/tx/schemeshard/ut_helpers/test_env.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ namespace NSchemeShardUT_Private {
169169

170170
public:
171171
TVector<ui64> TabletIds;
172+
TSet<ui32> NoRebootEventTypes;
172173
THolder<TTestActorRuntime> Runtime;
173174
TTestEnvOptions EnvOpts;
174175
THolder<TTestEnv> TestEnv;
@@ -202,7 +203,7 @@ namespace NSchemeShardUT_Private {
202203
private:
203204
virtual TTestEnv* CreateTestEnv();
204205
// Make sure that user requests are not dropped
205-
static bool PassUserRequests(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
206+
bool PassUserRequests(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
206207

207208
private:
208209
struct TFinalizer;

ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,19 @@ class TTestWithPipeResets: public TTestWithReboots {
3838
template <typename T> \
3939
void N(NUnitTest::TTestContext&)
4040

41+
#define Y_UNIT_TEST_WITH_REBOOTS_FLAG(N, OPT) \
42+
template <typename T, bool OPT> \
43+
void N(NUnitTest::TTestContext&); \
44+
struct TTestRegistration##N { \
45+
TTestRegistration##N() { \
46+
TCurrentTest::AddTest(#N "-" #OPT "[TabletReboots]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<TTestWithTabletReboots, false>), false); \
47+
TCurrentTest::AddTest(#N "-" #OPT "[PipeResets]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<TTestWithPipeResets, false>), false); \
48+
TCurrentTest::AddTest(#N "+" #OPT "[TabletReboots]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<TTestWithTabletReboots, true>), false); \
49+
TCurrentTest::AddTest(#N "+" #OPT "[PipeResets]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<TTestWithPipeResets, true>), false); \
50+
} \
51+
}; \
52+
static TTestRegistration##N testRegistration##N; \
53+
template <typename T, bool OPT> \
54+
void N(NUnitTest::TTestContext&)
55+
4156
}

0 commit comments

Comments
 (0)