@@ -2214,46 +2214,221 @@ void TIndexBuildInfo::SerializeToProto([[maybe_unused]] TSchemeShard* ss, NKikim
2214
2214
}
2215
2215
}
2216
2216
2217
+ ui64 TIndexBuildInfo::TKMeans::ParentEnd () const noexcept { // included
2218
+ return ChildBegin - 1 ;
2219
+ }
2220
+ ui64 TIndexBuildInfo::TKMeans::ChildEnd () const noexcept { // included
2221
+ return ChildBegin + ChildCount () - 1 ;
2222
+ }
2223
+
2224
+ ui64 TIndexBuildInfo::TKMeans::ParentCount () const noexcept {
2225
+ return ParentEnd () - ParentBegin + 1 ;
2226
+ }
2227
+ ui64 TIndexBuildInfo::TKMeans::ChildCount () const noexcept {
2228
+ return ParentCount () * K;
2229
+ }
2230
+
2231
+ TString TIndexBuildInfo::TKMeans::DebugString () const {
2232
+ return TStringBuilder ()
2233
+ << " { "
2234
+ << " State = " << State
2235
+ << " , Level = " << Level << " / " << Levels
2236
+ << " , K = " << K
2237
+ << " , Round = " << Round
2238
+ << " , Parent = [" << ParentBegin << " .." << Parent << " .." << ParentEnd () << " ]"
2239
+ << " , Child = [" << ChildBegin << " .." << Child << " .." << ChildEnd () << " ]"
2240
+ << " , TableSize = " << TableSize
2241
+ << " }" ;
2242
+ }
2243
+
2244
+ bool TIndexBuildInfo::TKMeans::NeedsAnotherLevel () const noexcept {
2245
+ return Level < Levels;
2246
+ }
2247
+ bool TIndexBuildInfo::TKMeans::NeedsAnotherParent () const noexcept {
2248
+ return Parent < ParentEnd ();
2249
+ }
2250
+
2251
+ bool TIndexBuildInfo::TKMeans::NextParent () noexcept {
2252
+ if (!NeedsAnotherParent ()) {
2253
+ return false ;
2254
+ }
2255
+ ++Parent;
2256
+ Child += K;
2257
+ return true ;
2258
+ }
2259
+
2260
+ bool TIndexBuildInfo::TKMeans::NextLevel () noexcept {
2261
+ if (!NeedsAnotherLevel ()) {
2262
+ return false ;
2263
+ }
2264
+ NextLevel (ChildCount ());
2265
+ return true ;
2266
+ }
2267
+
2268
+ void TIndexBuildInfo::TKMeans::PrefixIndexDone (ui64 shards) {
2269
+ Y_ENSURE (NeedsAnotherLevel ());
2270
+ // There's two worst cases, but in both one shard contains TableSize rows
2271
+ // 1. all rows have unique prefix (*), in such case we need 1 id for each row (parent, id in prefix table)
2272
+ // 2. all unique prefixes have size K, so we have TableSize/K parents + TableSize childs
2273
+ // * it doesn't work now, because now prefix should have at least K embeddings, but it's bug
2274
+ NextLevel ((2 * TableSize) * shards);
2275
+ Parent = ParentEnd ();
2276
+ }
2277
+
2278
+ void TIndexBuildInfo::TKMeans::Set (ui32 level,
2279
+ NTableIndex::TClusterId parentBegin, NTableIndex::TClusterId parent,
2280
+ NTableIndex::TClusterId childBegin, NTableIndex::TClusterId child,
2281
+ ui32 state, ui64 tableSize, ui32 round) {
2282
+ Level = level;
2283
+ Round = round;
2284
+ ParentBegin = parentBegin;
2285
+ Parent = parent;
2286
+ ChildBegin = childBegin;
2287
+ Child = child;
2288
+ State = static_cast <EState>(state);
2289
+ TableSize = tableSize;
2290
+ }
2291
+
2292
+ NKikimrTxDataShard::EKMeansState TIndexBuildInfo::TKMeans::GetUpload () const {
2293
+ if (Level == 1 ) {
2294
+ if (NeedsAnotherLevel ()) {
2295
+ return NKikimrTxDataShard::EKMeansState::UPLOAD_MAIN_TO_BUILD;
2296
+ } else {
2297
+ return NKikimrTxDataShard::EKMeansState::UPLOAD_MAIN_TO_POSTING;
2298
+ }
2299
+ } else {
2300
+ if (NeedsAnotherLevel ()) {
2301
+ return NKikimrTxDataShard::EKMeansState::UPLOAD_BUILD_TO_BUILD;
2302
+ } else {
2303
+ return NKikimrTxDataShard::EKMeansState::UPLOAD_BUILD_TO_POSTING;
2304
+ }
2305
+ }
2306
+ }
2307
+
2308
+ TString TIndexBuildInfo::TKMeans::WriteTo (bool needsBuildTable) const {
2309
+ using namespace NTableIndex ::NTableVectorKmeansTreeIndex;
2310
+ TString name = PostingTable;
2311
+ if (needsBuildTable || NeedsAnotherLevel ()) {
2312
+ name += Level % 2 != 0 ? BuildSuffix0 : BuildSuffix1;
2313
+ }
2314
+ return name;
2315
+ }
2316
+
2317
+ TString TIndexBuildInfo::TKMeans::ReadFrom () const {
2318
+ Y_ENSURE (Level > 1 );
2319
+ using namespace NTableIndex ::NTableVectorKmeansTreeIndex;
2320
+ TString name = PostingTable;
2321
+ name += Level % 2 != 0 ? BuildSuffix1 : BuildSuffix0;
2322
+ return name;
2323
+ }
2324
+
2325
+ std::pair<NTableIndex::TClusterId, NTableIndex::TClusterId> TIndexBuildInfo::TKMeans::RangeToBorders (const TSerializedTableRange& range) const {
2326
+ const NTableIndex::TClusterId minParent = ParentBegin;
2327
+ const NTableIndex::TClusterId maxParent = ParentEnd ();
2328
+ const NTableIndex::TClusterId parentFrom = [&, from = range.From .GetCells ()] {
2329
+ if (!from.empty ()) {
2330
+ if (!from[0 ].IsNull ()) {
2331
+ return from[0 ].AsValue <NTableIndex::TClusterId>() + static_cast <NTableIndex::TClusterId>(from.size () == 1 );
2332
+ }
2333
+ }
2334
+ return minParent;
2335
+ }();
2336
+ const NTableIndex::TClusterId parentTo = [&, to = range.To .GetCells ()] {
2337
+ if (!to.empty ()) {
2338
+ if (!to[0 ].IsNull ()) {
2339
+ return to[0 ].AsValue <NTableIndex::TClusterId>() - static_cast <NTableIndex::TClusterId>(to.size () != 1 && to[1 ].IsNull ());
2340
+ }
2341
+ }
2342
+ return maxParent;
2343
+ }();
2344
+ Y_ENSURE (minParent <= parentFrom, " minParent(" << minParent << " ) > parentFrom(" << parentFrom << " ) " << DebugString ());
2345
+ Y_ENSURE (parentFrom <= parentTo, " parentFrom(" << parentFrom << " ) > parentTo(" << parentTo << " ) " << DebugString ());
2346
+ Y_ENSURE (parentTo <= maxParent, " parentTo(" << parentTo << " ) > maxParent(" << maxParent << " ) " << DebugString ());
2347
+ return {parentFrom, parentTo};
2348
+ }
2349
+
2350
+ TString TIndexBuildInfo::TKMeans::RangeToDebugStr (const TSerializedTableRange& range) const {
2351
+ auto toStr = [&](const TSerializedCellVec& v) -> TString {
2352
+ const auto cells = v.GetCells ();
2353
+ if (cells.empty ()) {
2354
+ return " inf" ;
2355
+ }
2356
+ if (cells[0 ].IsNull ()) {
2357
+ return " -inf" ;
2358
+ }
2359
+ auto str = TStringBuilder{} << " { count: " << cells.size ();
2360
+ if (Level > 1 ) {
2361
+ str << " , parent: " << cells[0 ].AsValue <NTableIndex::TClusterId>();
2362
+ if (cells.size () != 1 && cells[1 ].IsNull ()) {
2363
+ str << " , pk: null" ;
2364
+ }
2365
+ }
2366
+ return str << " }" ;
2367
+ };
2368
+ return TStringBuilder{} << " { From: " << toStr (range.From ) << " , To: " << toStr (range.To ) << " }" ;
2369
+ }
2370
+
2371
+ void TIndexBuildInfo::TKMeans::NextLevel (ui64 childCount) noexcept {
2372
+ ParentBegin = ChildBegin;
2373
+ Parent = ParentBegin;
2374
+ ChildBegin = ParentBegin + childCount;
2375
+ Child = ChildBegin;
2376
+ ++Level;
2377
+ }
2378
+
2217
2379
void TIndexBuildInfo::AddParent (const TSerializedTableRange& range, TShardIdx shard) {
2218
2380
// For Parent == 0 only single kmeans needed, so there are two options:
2219
2381
// 1. It fits entirely in the single shard => local kmeans for single shard
2220
2382
// 2. It doesn't fit entirely in the single shard => global kmeans for all shards
2221
- const auto [parentFrom, parentTo] = KMeans.Parent == 0
2383
+ auto [parentFrom, parentTo] = KMeans.Parent == 0
2222
2384
? std::pair<NTableIndex::TClusterId, NTableIndex::TClusterId>{0 , 0 }
2223
2385
: KMeans.RangeToBorders (range);
2224
- // TODO(mbkkt) We can make it more granular
2225
2386
2226
- // the new range does not intersect with other ranges, just add it with 1 shard
2227
2387
auto itFrom = Cluster2Shards.lower_bound (parentFrom);
2228
2388
if (itFrom == Cluster2Shards.end () || parentTo < itFrom->second .From ) {
2389
+ // The new range does not intersect with other ranges, just add it with 1 shard
2229
2390
Cluster2Shards.emplace_hint (itFrom, parentTo, TClusterShards{.From = parentFrom, .Shards = {shard}});
2230
2391
return ;
2231
2392
}
2232
2393
2233
- // otherwise, this range has multiple shards and we need to merge all intersecting ranges
2234
- auto itTo = parentTo < itFrom->first ? itFrom : Cluster2Shards.lower_bound (parentTo);
2235
- if (itTo == Cluster2Shards.end ()) {
2236
- itTo = Cluster2Shards.rbegin ().base ();
2237
- }
2238
- if (itTo->first < parentTo) {
2239
- const bool needsToReplaceFrom = itFrom == itTo;
2240
- auto node = Cluster2Shards.extract (itTo);
2241
- node.key () = parentTo;
2242
- itTo = Cluster2Shards.insert (Cluster2Shards.end (), std::move (node));
2243
- itFrom = needsToReplaceFrom ? itTo : itFrom;
2394
+ for (auto it = itFrom; it != Cluster2Shards.end () && it->second .From <= parentTo && it->first >= parentFrom; it++) {
2395
+ // The new shard may only intersect with existing shards by its starting or ending edge
2396
+ Y_ENSURE (it->second .From == parentTo || it->first == parentFrom);
2244
2397
}
2245
- auto & [toFrom, toShards] = itTo->second ;
2246
2398
2247
- toFrom = std::min (toFrom, parentFrom);
2248
- toShards.emplace_back (shard);
2399
+ if (parentFrom == itFrom->first ) {
2400
+ // Intersects by parentFrom
2401
+ if (itFrom->second .From < itFrom->first ) {
2402
+ Cluster2Shards.emplace_hint (itFrom, itFrom->first -1 , itFrom->second );
2403
+ itFrom->second .From = parentFrom;
2404
+ }
2405
+ itFrom->second .Shards .push_back (shard);
2406
+ // Increment to also check intersection by parentTo
2407
+ itFrom++;
2408
+ if (parentTo == parentFrom) {
2409
+ return ;
2410
+ }
2411
+ parentFrom++;
2412
+ }
2249
2413
2250
- while (itFrom != itTo) {
2251
- const auto & [fromFrom, fromShards] = itFrom->second ;
2252
- toFrom = std::min (toFrom, fromFrom);
2253
- Y_ASSERT (!fromShards.empty ());
2254
- toShards.insert (toShards.end (), fromShards.begin (), fromShards.end ());
2255
- itFrom = Cluster2Shards.erase (itFrom);
2414
+ if (parentTo == itFrom->second .From ) {
2415
+ // Intersects by parentTo
2416
+ if (itFrom->second .From < itFrom->first ) {
2417
+ auto endShards = itFrom->second .Shards ;
2418
+ endShards.push_back (shard);
2419
+ Cluster2Shards.emplace_hint (itFrom, parentTo, TClusterShards{.From = parentTo, .Shards = std::move (endShards)});
2420
+ itFrom->second .From = parentTo+1 ;
2421
+ } else {
2422
+ itFrom->second .Shards .push_back (shard);
2423
+ }
2424
+ if (parentTo == parentFrom) {
2425
+ return ;
2426
+ }
2427
+ parentTo--;
2256
2428
}
2429
+
2430
+ // Add the remaining range
2431
+ Cluster2Shards.emplace_hint (itFrom, parentTo, TClusterShards{.From = parentFrom, .Shards = {shard}});
2257
2432
}
2258
2433
2259
2434
TColumnFamiliesMerger::TColumnFamiliesMerger (NKikimrSchemeOp::TPartitionConfig &container)
0 commit comments