8
8
#include < ydb/core/tx/datashard/datashard.h>
9
9
#include < ydb/core/metering/metering.h>
10
10
11
+ #include < ydb/public/lib/deprecated/kicli/kicli.h>
11
12
#include < ydb-cpp-sdk/client/table/table.h>
12
13
13
14
using namespace NKikimr ;
@@ -79,30 +80,37 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {
79
80
80
81
// Just create main table
81
82
TestCreateTable (runtime, tenantSchemeShard, ++txId, " /MyRoot/ServerLessDB" , R"(
82
- Name: "Table"
83
- Columns { Name: "key" Type: "Uint32" }
84
- Columns { Name: "embedding" Type: "String" }
85
- KeyColumnNames: ["key"]
83
+ Name: "Table"
84
+ Columns { Name: "key" Type: "Uint32" }
85
+ Columns { Name: "embedding" Type: "String" }
86
+ KeyColumnNames: ["key"]
87
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 50 } } } }
88
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 150 } } } }
86
89
)" );
87
90
env.TestWaitNotification (runtime, txId, tenantSchemeShard);
88
91
89
- auto fnWriteRow = [&](ui64 tabletId, ui32 key, TString embedding, const char * table) {
90
- TString writeQuery = Sprintf (R"(
91
- (
92
- (let key '( '('key (Uint32 '%u ) ) ) )
93
- (let row '( '('embedding (String '%s ) ) ) )
94
- (return (AsList (UpdateRow '__user__%s key row) ))
95
- )
96
- )" , key, embedding.c_str (), table);
97
- NKikimrMiniKQL::TResult result;
98
- TString err;
99
- NKikimrProto::EReplyStatus status = LocalMiniKQL (runtime, tabletId, writeQuery, result, err);
100
- UNIT_ASSERT_VALUES_EQUAL (err, " " );
101
- UNIT_ASSERT_VALUES_EQUAL (status, NKikimrProto::EReplyStatus::OK);
92
+ // Write data directly into shards
93
+ auto fillRows = [&](const TString & tablePath, ui32 shard, ui32 min, ui32 max) {
94
+ TVector<TCell> cells;
95
+ ui8 str[6 ] = { 0 };
96
+ str[4 ] = (ui8)Ydb::Table::VectorIndexSettings::VECTOR_TYPE_UINT8;
97
+ for (ui32 key = min; key < max; ++key) {
98
+ str[0 ] = ((key+106 )* 7 ) % 256 ;
99
+ str[1 ] = ((key+106 )*17 ) % 256 ;
100
+ str[2 ] = ((key+106 )*37 ) % 256 ;
101
+ str[3 ] = ((key+106 )*47 ) % 256 ;
102
+ cells.emplace_back (TCell::Make (key));
103
+ cells.emplace_back (TCell ((const char *)str, 5 ));
104
+ }
105
+ std::vector<ui32> columnIds{1 , 2 };
106
+ TSerializedCellMatrix matrix (cells, max-min, 2 );
107
+ WriteOp (runtime, tenantSchemeShard, ++txId, tablePath,
108
+ shard, NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
109
+ columnIds, std::move (matrix), true );
102
110
};
103
- for (ui32 key = 0 ; key < 200 ; ++key) {
104
- fnWriteRow (TTestTxConfig::FakeHiveTablets + 6 , key, std::to_string (key), " Table " );
105
- }
111
+ fillRows ( " /MyRoot/ServerLessDB/Table " , 0 , 0 , 50 );
112
+ fillRows ( " /MyRoot/ServerLessDB/Table " , 1 , 50 , 150 );
113
+ fillRows ( " /MyRoot/ServerLessDB/Table " , 2 , 150 , 200 );
106
114
107
115
runtime.SetLogPriority (NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
108
116
runtime.SetLogPriority (NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE);
@@ -118,18 +126,95 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {
118
126
meteringMessages << event->Get ()->MeteringJson ;
119
127
});
120
128
121
- TestBuildVectorIndex (runtime, ++txId, tenantSchemeShard, " /MyRoot/ServerLessDB" , " /MyRoot/ServerLessDB/Table" , " index1" , " embedding" );
129
+ TBlockEvents<TEvDataShard::TEvReshuffleKMeansRequest> reshuffleBlocker (runtime, [&](const auto & ) {
130
+ return true ;
131
+ });
132
+
133
+ AsyncBuildVectorIndex (runtime, ++txId, tenantSchemeShard, " /MyRoot/ServerLessDB" , " /MyRoot/ServerLessDB/Table" , " index1" , " embedding" );
122
134
ui64 buildIndexId = txId;
123
135
136
+ // Wait for the first "reshuffle" request (samples will be already collected on the first level)
137
+ // and reboot the scheme shard to verify that its intermediate state is persisted correctly.
138
+ // The bug checked here: Sample.Probability was not persisted (#18236).
139
+ runtime.WaitFor (" ReshuffleKMeansRequest" , [&]{ return reshuffleBlocker.size (); });
140
+ Cerr << " ... rebooting scheme shard" << Endl;
141
+ RebootTablet (runtime, tenantSchemeShard, runtime.AllocateEdgeActor ());
142
+
143
+ // Now wait for the 1st level to be finalized
144
+ TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> level1Blocker (runtime, [&](auto & ev) {
145
+ const auto & record = ev->Get ()->Record ;
146
+ if (record.GetTransaction (0 ).GetOperationType () == NKikimrSchemeOp::ESchemeOpInitiateBuildIndexImplTable) {
147
+ txId = record.GetTxId ();
148
+ return true ;
149
+ }
150
+ return false ;
151
+ });
152
+ reshuffleBlocker.Stop ();
153
+ reshuffleBlocker.Unblock (reshuffleBlocker.size ());
154
+
155
+ // Reshard the first level table (0build)
156
+ // First bug checked here: after restarting the schemeshard during reshuffle it
157
+ // generates more clusters than requested and dies with VERIFY on shard boundaries (#18278).
158
+ // Second bug checked here: posting table doesn't contain all rows from the main table
159
+ // when the build table is resharded during build (#18355).
160
+ {
161
+ auto indexDesc = DescribePath (runtime, tenantSchemeShard, " /MyRoot/ServerLessDB/Table/index1/indexImplPostingTable0build" , true , true , true );
162
+ auto parts = indexDesc.GetPathDescription ().GetTablePartitions ();
163
+ UNIT_ASSERT_EQUAL (parts.size (), 4 );
164
+ ui64 cluster = 1 ;
165
+ for (const auto & x: parts) {
166
+ TestSplitTable (runtime, tenantSchemeShard, ++txId, " /MyRoot/ServerLessDB/Table/index1/indexImplPostingTable0build" , Sprintf (R"(
167
+ SourceTabletId: %lu
168
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: %lu } } Tuple { Optional { Uint32: 50 } } } }
169
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: %lu } } Tuple { Optional { Uint32: 150 } } } }
170
+ )" , x.GetDatashardId (), cluster, cluster));
171
+ env.TestWaitNotification (runtime, txId);
172
+ cluster++;
173
+ }
174
+ }
175
+
176
+ level1Blocker.Stop ();
177
+ level1Blocker.Unblock (level1Blocker.size ());
178
+
179
+ // Now wait for the index build
180
+ {
181
+ auto expectedStatus = Ydb::StatusIds::SUCCESS;
182
+ TAutoPtr<IEventHandle> handle;
183
+ TEvIndexBuilder::TEvCreateResponse* event = runtime.GrabEdgeEvent <TEvIndexBuilder::TEvCreateResponse>(handle);
184
+ UNIT_ASSERT (event);
185
+
186
+ Cerr << " BUILDINDEX RESPONSE CREATE: " << event->ToString () << Endl;
187
+ UNIT_ASSERT_EQUAL_C (event->Record .GetStatus (), expectedStatus,
188
+ " status mismatch"
189
+ << " got " << Ydb::StatusIds::StatusCode_Name (event->Record .GetStatus ())
190
+ << " expected " << Ydb::StatusIds::StatusCode_Name (expectedStatus)
191
+ << " issues was " << event->Record .GetIssues ());
192
+ }
193
+
194
+ env.TestWaitNotification (runtime, buildIndexId, tenantSchemeShard);
195
+
196
+ // Check row count in the posting table
197
+ {
198
+ auto indexDesc = DescribePath (runtime, tenantSchemeShard, " /MyRoot/ServerLessDB/Table/index1/indexImplPostingTable" , true , true , true );
199
+ auto parts = indexDesc.GetPathDescription ().GetTablePartitions ();
200
+ ui32 rows = 0 ;
201
+ for (const auto & x: parts) {
202
+ auto result = ReadTable (runtime, x.GetDatashardId (), " indexImplPostingTable" ,
203
+ {NKikimr::NTableIndex::NTableVectorKmeansTreeIndex::ParentColumn, " key" }, {" key" });
204
+ auto value = NClient::TValue::Create (result);
205
+ rows += value[" Result" ][" List" ].Size ();
206
+ }
207
+ Cerr << " ... posting table contains " << rows << " rows" << Endl;
208
+ UNIT_ASSERT_VALUES_EQUAL (rows, 200 );
209
+ }
210
+
124
211
auto listing = TestListBuildIndex (runtime, tenantSchemeShard, " /MyRoot/ServerLessDB" );
125
212
UNIT_ASSERT_VALUES_EQUAL (listing.EntriesSize (), 1 );
126
213
127
- env.TestWaitNotification (runtime, txId, tenantSchemeShard);
128
-
129
- auto descr = TestGetBuildIndex (runtime, tenantSchemeShard, " /MyRoot/ServerLessDB" , txId);
214
+ auto descr = TestGetBuildIndex (runtime, tenantSchemeShard, " /MyRoot/ServerLessDB" , buildIndexId);
130
215
UNIT_ASSERT_VALUES_EQUAL (descr.GetIndexBuild ().GetState (), Ydb::Table::IndexBuildState::STATE_DONE);
131
216
132
- const TString meteringData = R"( {"usage":{"start":0,"quantity":128 ,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"106 -72075186233409549-2-0-0-0-0-200-0-1290-0 ","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})" " \n " ;
217
+ const TString meteringData = R"( {"usage":{"start":0,"quantity":431 ,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"109 -72075186233409549-2-0-0-0-0-619-605-11328-10960 ","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})" " \n " ;
133
218
134
219
UNIT_ASSERT_NO_DIFF (meteringMessages, meteringData);
135
220
@@ -152,6 +237,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {
152
237
)" );
153
238
env.TestWaitNotification (runtime, txId, tenantSchemeShard);
154
239
240
+ Cerr << " ... rebooting scheme shard" << Endl;
155
241
RebootTablet (runtime, tenantSchemeShard, runtime.AllocateEdgeActor ());
156
242
157
243
TestDescribeResult (DescribePath (runtime, tenantSchemeShard, " /MyRoot/ServerLessDB/Table" ),
@@ -200,16 +286,14 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {
200
286
NLs::ExtractTenantSchemeshard (&tenantSchemeShard)});
201
287
202
288
TestCreateTable (runtime, tenantSchemeShard, ++txId, " /MyRoot/CommonDB" , R"(
203
- Name: "Table"
204
- Columns { Name: "key" Type: "Uint32" }
205
- Columns { Name: "embedding" Type: "String" }
206
- KeyColumnNames: ["key"]
289
+ Name: "Table"
290
+ Columns { Name: "key" Type: "Uint32" }
291
+ Columns { Name: "embedding" Type: "String" }
292
+ KeyColumnNames: ["key"]
207
293
)" );
208
294
env.TestWaitNotification (runtime, txId, tenantSchemeShard);
209
295
210
- for (ui32 key = 100 ; key < 300 ; ++key) {
211
- fnWriteRow (TTestTxConfig::FakeHiveTablets + 6 , key, std::to_string (key), " Table" );
212
- }
296
+ fillRows (" /MyRoot/CommonDB/Table" , 0 , 100 , 300 );
213
297
214
298
TVector<TString> billRecords;
215
299
observerHolder = runtime.AddObserver <NMetering::TEvMetering::TEvWriteMeteringJson>([&](NMetering::TEvMetering::TEvWriteMeteringJson::TPtr& event) {
0 commit comments