@@ -271,8 +271,15 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
271
271
}
272
272
273
273
class TReshardingTest {
274
- private:
275
- YDB_ACCESSOR (TString, ShardingType, " HASH_FUNCTION_CONSISTENCY_64" );
274
+ public:
275
+ TReshardingTest ()
276
+ : Kikimr(TKikimrSettings().SetWithSampleTables(false ))
277
+ , CSController(NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>())
278
+ , TableClient(Kikimr.GetTableClient()) {
279
+ CSController->SetOverridePeriodicWakeupActivationPeriod (TDuration::Seconds (1 ));
280
+ CSController->SetOverrideLagForCompactionBeforeTierings (TDuration::Seconds (1 ));
281
+ CSController->SetOverrideReduceMemoryIntervalLimit (1LLU << 30 );
282
+ }
276
283
277
284
void WaitResharding (const TString& hint = " " ) {
278
285
const TInstant start = TInstant::Now ();
@@ -308,22 +315,20 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
308
315
CompareYson (result, " [[" + ::ToString (expectation) + " u;]]" );
309
316
}
310
317
318
+ protected:
311
319
TKikimrRunner Kikimr;
312
- public:
320
+ NKikimr::NYDBTest::TControllers::TGuard<NKikimr::NYDBTest::NColumnShard::TController> CSController;
321
+ NYdb::NTable::TTableClient TableClient;
322
+ };
313
323
314
- TReshardingTest ()
315
- : Kikimr(TKikimrSettings().SetWithSampleTables( false )) {
324
+ class TShardingTypeTest : public TReshardingTest {
325
+ YDB_ACCESSOR (TString, ShardingType, " HASH_FUNCTION_CONSISTENCY_64 " );
316
326
317
- }
327
+ public:
328
+ using TReshardingTest::TReshardingTest;
318
329
319
330
void Execute () {
320
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
321
- csController->SetOverridePeriodicWakeupActivationPeriod (TDuration::Seconds (1 ));
322
- csController->SetOverrideLagForCompactionBeforeTierings (TDuration::Seconds (1 ));
323
- csController->SetOverrideReduceMemoryIntervalLimit (1LLU << 30 );
324
-
325
331
TLocalHelper (Kikimr).SetShardingMethod (ShardingType).CreateTestOlapTable (" olapTable" , " olapStore" , 24 , 4 );
326
- auto tableClient = Kikimr.GetTableClient ();
327
332
328
333
Tests::NCommon::TLoggerInit (Kikimr).SetComponents ({ NKikimrServices::TX_COLUMNSHARD, NKikimrServices::TX_COLUMNSHARD_SCAN }, " CS" ).SetPriority (NActors::NLog::PRI_DEBUG).Initialize ();
329
334
@@ -340,71 +345,156 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
340
345
CheckCount (230000 );
341
346
for (ui32 i = 0 ; i < 2 ; ++i) {
342
347
auto alterQuery = TStringBuilder () << R"( ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)" ;
343
- auto session = tableClient .CreateSession ().GetValueSync ().GetSession ();
348
+ auto session = TableClient .CreateSession ().GetValueSync ().GetSession ();
344
349
auto alterResult = session.ExecuteSchemeQuery (alterQuery).GetValueSync ();
345
350
UNIT_ASSERT_VALUES_EQUAL_C (alterResult.GetStatus (), NYdb::EStatus::SUCCESS, alterResult.GetIssues ().ToString ());
346
351
WaitResharding (" SPLIT:" + ::ToString (i));
347
352
}
348
353
{
349
354
auto alterQuery = TStringBuilder () << R"( ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)" ;
350
- auto session = tableClient .CreateSession ().GetValueSync ().GetSession ();
355
+ auto session = TableClient .CreateSession ().GetValueSync ().GetSession ();
351
356
auto alterResult = session.ExecuteSchemeQuery (alterQuery).GetValueSync ();
352
357
UNIT_ASSERT_VALUES_UNEQUAL_C (alterResult.GetStatus (), NYdb::EStatus::SUCCESS, alterResult.GetIssues ().ToString ());
353
358
}
354
- AFL_VERIFY (csController ->GetShardingFiltersCount ().Val () == 0 );
359
+ AFL_VERIFY (CSController ->GetShardingFiltersCount ().Val () == 0 );
355
360
CheckCount (230000 );
356
- i64 count = csController ->GetShardingFiltersCount ().Val ();
361
+ i64 count = CSController ->GetShardingFiltersCount ().Val ();
357
362
AFL_VERIFY (count >= 16 )(" count" , count);
358
- csController ->DisableBackground (NKikimr::NYDBTest::ICSController::EBackground::Indexation);
359
- csController ->DisableBackground (NKikimr::NYDBTest::ICSController::EBackground::Compaction);
360
- csController ->WaitIndexation (TDuration::Seconds (3 ));
361
- csController ->WaitCompactions (TDuration::Seconds (3 ));
363
+ CSController ->DisableBackground (NKikimr::NYDBTest::ICSController::EBackground::Indexation);
364
+ CSController ->DisableBackground (NKikimr::NYDBTest::ICSController::EBackground::Compaction);
365
+ CSController ->WaitIndexation (TDuration::Seconds (3 ));
366
+ CSController ->WaitCompactions (TDuration::Seconds (3 ));
362
367
WriteTestData (Kikimr, " /Root/olapStore/olapTable" , 1000000 , 300000000 , 10000 );
363
368
CheckCount (230000 );
364
- csController ->EnableBackground (NKikimr::NYDBTest::ICSController::EBackground::Indexation);
365
- csController ->WaitIndexation (TDuration::Seconds (5 ));
369
+ CSController ->EnableBackground (NKikimr::NYDBTest::ICSController::EBackground::Indexation);
370
+ CSController ->WaitIndexation (TDuration::Seconds (5 ));
366
371
CheckCount (230000 );
367
- csController ->EnableBackground (NKikimr::NYDBTest::ICSController::EBackground::Compaction);
368
- csController ->WaitCompactions (TDuration::Seconds (5 ));
369
- count = csController ->GetShardingFiltersCount ().Val ();
372
+ CSController ->EnableBackground (NKikimr::NYDBTest::ICSController::EBackground::Compaction);
373
+ CSController ->WaitCompactions (TDuration::Seconds (5 ));
374
+ count = CSController ->GetShardingFiltersCount ().Val ();
370
375
CheckCount (230000 );
371
376
372
- csController ->SetCompactionControl (NYDBTest::EOptimizerCompactionWeightControl::Disable);
377
+ CSController ->SetCompactionControl (NYDBTest::EOptimizerCompactionWeightControl::Disable);
373
378
374
379
CheckCount (230000 );
375
380
376
- AFL_VERIFY (count == csController->GetShardingFiltersCount ().Val ())(" count" , count)(" val" , csController->GetShardingFiltersCount ().Val ());
381
+ AFL_VERIFY (count == CSController->GetShardingFiltersCount ().Val ())(" count" , count)(
382
+ " val" , CSController->GetShardingFiltersCount ().Val ());
377
383
const ui32 portionsCount = 16 ;
378
384
for (ui32 i = 0 ; i < 4 ; ++i) {
379
385
{
380
386
auto alterQuery = TStringBuilder () << R"( ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)" ;
381
- auto session = tableClient .CreateSession ().GetValueSync ().GetSession ();
387
+ auto session = TableClient .CreateSession ().GetValueSync ().GetSession ();
382
388
auto alterResult = session.ExecuteSchemeQuery (alterQuery).GetValueSync ();
383
389
UNIT_ASSERT_VALUES_EQUAL_C (alterResult.GetStatus (), NYdb::EStatus::SUCCESS, alterResult.GetIssues ().ToString ());
384
390
}
385
391
WaitResharding (" MERGE:" + ::ToString (i));
386
- // csController ->WaitCleaning(TDuration::Seconds(5));
392
+ // CSController ->WaitCleaning(TDuration::Seconds(5));
387
393
388
394
CheckCount (230000 );
389
- AFL_VERIFY (count + portionsCount == csController->GetShardingFiltersCount ().Val ())(" count" , count)(" val" , csController->GetShardingFiltersCount ().Val ());
395
+ AFL_VERIFY (count + portionsCount == CSController->GetShardingFiltersCount ().Val ())(" count" , count)(
396
+ " val" , CSController->GetShardingFiltersCount ().Val ());
390
397
count += portionsCount;
391
398
}
392
399
{
393
400
auto alterQuery = TStringBuilder () << R"( ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)" ;
394
- auto session = tableClient .CreateSession ().GetValueSync ().GetSession ();
401
+ auto session = TableClient .CreateSession ().GetValueSync ().GetSession ();
395
402
auto alterResult = session.ExecuteSchemeQuery (alterQuery).GetValueSync ();
396
403
UNIT_ASSERT_VALUES_UNEQUAL_C (alterResult.GetStatus (), NYdb::EStatus::SUCCESS, alterResult.GetIssues ().ToString ());
397
404
}
398
- csController ->CheckInvariants ();
405
+ CSController ->CheckInvariants ();
399
406
}
400
407
};
401
408
402
409
Y_UNIT_TEST (TableReshardingConsistency64) {
403
- TReshardingTest ().SetShardingType (" HASH_FUNCTION_CONSISTENCY_64" ).Execute ();
410
+ TShardingTypeTest ().SetShardingType (" HASH_FUNCTION_CONSISTENCY_64" ).Execute ();
404
411
}
405
412
406
413
Y_UNIT_TEST (TableReshardingModuloN) {
407
- TReshardingTest ().SetShardingType (" HASH_FUNCTION_MODULO_N" ).Execute ();
414
+ TShardingTypeTest ().SetShardingType (" HASH_FUNCTION_CONSISTENCY_64" ).Execute ();
415
+ }
416
+
417
+ class TAsyncReshardingTest : public TReshardingTest {
418
+ YDB_ACCESSOR (TString, ShardingType, " HASH_FUNCTION_CONSISTENCY_64" );
419
+
420
+ public:
421
+ TAsyncReshardingTest () {
422
+ TLocalHelper (Kikimr).CreateTestOlapTable (" olapTable" , " olapStore" , 24 , 4 );
423
+ }
424
+
425
+ void AddBatch (int numRows) {
426
+ WriteTestData (Kikimr, " /Root/olapStore/olapTable" , LastPathId, LastTs, numRows);
427
+ LastPathId += numRows * 10 ;
428
+ LastTs += numRows * 10 ;
429
+ NumRows += numRows;
430
+ }
431
+
432
+ void StartResharding (TString modification) {
433
+ auto alterQuery =
434
+ TStringBuilder () << R"( ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=)"
435
+ << modification << " );" ;
436
+ auto session = TableClient.CreateSession ().GetValueSync ().GetSession ();
437
+ auto alterResult = session.ExecuteSchemeQuery (alterQuery).GetValueSync ();
438
+
439
+ UNIT_ASSERT_VALUES_EQUAL_C (alterResult.GetStatus (), NYdb::EStatus::SUCCESS, alterResult.GetIssues ().ToString ());
440
+ }
441
+
442
+ void CheckCount () {
443
+ TReshardingTest::CheckCount (NumRows);
444
+ }
445
+
446
+ void ChangeSchema () {
447
+ auto alterQuery =
448
+ " ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=level, "
449
+ " `SERIALIZER.CLASS_NAME`=`ARROW_SERIALIZER`, "
450
+ " `COMPRESSION.TYPE`=`zstd`);" ;
451
+ auto session = TableClient.CreateSession ().GetValueSync ().GetSession ();
452
+ auto alterResult = session.ExecuteSchemeQuery (alterQuery).GetValueSync ();
453
+
454
+ UNIT_ASSERT_VALUES_EQUAL_C (alterResult.GetStatus (), NYdb::EStatus::SUCCESS, alterResult.GetIssues ().ToString ());
455
+ }
456
+
457
+ void DisableCompaction () {
458
+ CSController->SetCompactionControl (NYDBTest::EOptimizerCompactionWeightControl::Disable);
459
+ }
460
+
461
+ private:
462
+ ui64 LastPathId = 1000000 ;
463
+ ui64 LastTs = 300000000 ;
464
+ ui64 NumRows = 0 ;
465
+ };
466
+
467
+ Y_UNIT_TEST (UpsertWhileSplitTest) {
468
+ TAsyncReshardingTest tester;
469
+
470
+ tester.AddBatch (10000 );
471
+
472
+ tester.CheckCount ();
473
+
474
+ for (int i = 0 ; i < 4 ; i++) {
475
+ tester.StartResharding (" SPLIT" );
476
+
477
+ tester.CheckCount ();
478
+ tester.AddBatch (10000 );
479
+ tester.CheckCount ();
480
+ tester.WaitResharding ();
481
+ }
482
+ tester.AddBatch (10000 );
483
+ tester.CheckCount ();
484
+ }
485
+
486
+ Y_UNIT_TEST (ChangeSchemaAndSplit) {
487
+ TAsyncReshardingTest tester;
488
+ tester.DisableCompaction ();
489
+
490
+ tester.AddBatch (10000 );
491
+ tester.ChangeSchema ();
492
+ tester.AddBatch (10000 );
493
+
494
+ tester.StartResharding (" SPLIT" );
495
+ tester.WaitResharding ();
496
+
497
+ tester.CheckCount ();
408
498
}
409
499
}
410
500
}
0 commit comments