@@ -1479,7 +1479,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
1479
1479
}
1480
1480
1481
1481
if (partSwitch.FollowerUpdateStep ) {
1482
- auto subset = Database->Subset (partSwitch.TableId , partSwitch.Leaving , partSwitch.Head );
1482
+ auto subset = Database->PartSwitchSubset (partSwitch.TableId , partSwitch.Head , partSwitch. Leaving , partSwitch.LeavingTxStatus );
1483
1483
1484
1484
if (partSwitch.Head != subset->Head ) {
1485
1485
Y_ABORT (" Follower table epoch head has diverged from leader" );
@@ -1488,30 +1488,36 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
1488
1488
}
1489
1489
1490
1490
Y_ABORT_UNLESS (newColdParts.empty (), " Unexpected cold part at a follower" );
1491
- Database->Replace (partSwitch.TableId , std::move (newParts), *subset);
1492
- Database->ReplaceTxStatus (partSwitch.TableId , std::move (newTxStatus), *subset);
1491
+ Database->Replace (partSwitch.TableId , *subset, std::move (newParts), std::move (newTxStatus));
1493
1492
1494
1493
for (auto &gone : subset->Flatten )
1495
1494
DropCachesOfBundle (*gone);
1496
1495
1497
1496
Send (Owner->Tablet (), new TEvTablet::TEvFGcAck (Owner->TabletID (), Generation (), partSwitch.FollowerUpdateStep ));
1498
1497
} else {
1498
+ bool merged = false ;
1499
1499
for (auto &partView : newParts) {
1500
1500
Database->Merge (partSwitch.TableId , partView);
1501
+ merged = true ;
1501
1502
1502
1503
if (CompactionLogic) {
1503
1504
CompactionLogic->BorrowedPart (partSwitch.TableId , std::move (partView));
1504
1505
}
1505
1506
}
1506
1507
for (auto &part : newColdParts) {
1507
1508
Database->Merge (partSwitch.TableId , part);
1509
+ merged = true ;
1508
1510
1509
1511
if (CompactionLogic) {
1510
1512
CompactionLogic->BorrowedPart (partSwitch.TableId , std::move (part));
1511
1513
}
1512
1514
}
1513
1515
for (auto &txStatus : newTxStatus) {
1514
1516
Database->Merge (partSwitch.TableId , txStatus);
1517
+ merged = true ;
1518
+ }
1519
+ if (merged) {
1520
+ Database->MergeDone (partSwitch.TableId );
1515
1521
}
1516
1522
}
1517
1523
@@ -1533,7 +1539,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
1533
1539
// N.B. there should be a single source table per part switch
1534
1540
for (auto & [sourceTable, state] : perTable) {
1535
1541
// Rebase source parts to their respective new epochs
1536
- auto srcSubset = Database->Subset (sourceTable, state. Bundles , NTable::TEpoch::Zero ());
1542
+ auto srcSubset = Database->PartSwitchSubset (sourceTable, NTable::TEpoch::Zero (), state. Bundles , { } );
1537
1543
TVector<NTable::TPartView> rebased (Reserve (srcSubset->Flatten .size ()));
1538
1544
for (const auto & partView : srcSubset->Flatten ) {
1539
1545
Y_ABORT_UNLESS (!partView->TxIdStats , " Cannot move parts with uncommitted deltas" );
@@ -1542,7 +1548,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
1542
1548
}
1543
1549
1544
1550
// Remove source parts from the source table
1545
- Database->Replace (sourceTable, { }, *srcSubset );
1551
+ Database->Replace (sourceTable, *srcSubset, { }, { } );
1546
1552
1547
1553
if (CompactionLogic) {
1548
1554
CompactionLogic->RemovedParts (sourceTable, state.Bundles );
@@ -1557,6 +1563,8 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
1557
1563
}
1558
1564
}
1559
1565
}
1566
+
1567
+ Database->MergeDone (partSwitch.TableId );
1560
1568
}
1561
1569
}
1562
1570
@@ -2310,7 +2318,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
2310
2318
}
2311
2319
2312
2320
// Remove source parts from the source table
2313
- Database->Replace (src, { }, *srcSubset );
2321
+ Database->Replace (src, *srcSubset, { }, { } );
2314
2322
2315
2323
const auto logicResult = CompactionLogic->RemovedParts (src, labels);
2316
2324
@@ -2342,6 +2350,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
2342
2350
Database->Merge (dst, partView);
2343
2351
CompactionLogic->BorrowedPart (dst, partView);
2344
2352
}
2353
+ Database->MergeDone (dst);
2345
2354
2346
2355
// Serialize rebased parts as moved from the source table
2347
2356
NKikimrExecutorFlat::TTablePartSwitch proto;
@@ -3050,7 +3059,7 @@ THolder<TScanSnapshot> TExecutor::PrepareScanSnapshot(ui32 table, const NTable::
3050
3059
TAutoPtr<NTable::TSubset> subset;
3051
3060
3052
3061
if (params) {
3053
- subset = Database->Subset (table, { }, params->Edge .Head );
3062
+ subset = Database->CompactionSubset (table, params->Edge .Head , { } );
3054
3063
3055
3064
if (params->Parts ) {
3056
3065
subset->Flatten .insert (subset->Flatten .end (), params->Parts .begin (), params->Parts .end ());
@@ -3397,8 +3406,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
3397
3406
newParts.emplace_back (result.Part );
3398
3407
}
3399
3408
3400
- Database->Replace (tableId, newParts, *ops->Subset );
3401
- Database->ReplaceTxStatus (tableId, newTxStatus, *ops->Subset );
3409
+ Database->Replace (tableId, *ops->Subset , newParts, newTxStatus);
3402
3410
3403
3411
TVector<TLogoBlobID> bundles (Reserve (ops->Subset ->Flatten .size () + ops->Subset ->ColdParts .size ()));
3404
3412
for (auto &part: ops->Subset ->Flatten ) {
@@ -4525,23 +4533,28 @@ ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params)
4525
4533
if (!memTableSnapshot->GetCommittedTransactions ().empty () || !memTableSnapshot->GetRemovedTransactions ().empty ()) {
4526
4534
// We must compact tx status when mem table has changes
4527
4535
compactTxStatus = true ;
4536
+ break ;
4528
4537
}
4529
4538
}
4530
4539
for (const auto & txStatus : snapshot->Subset ->TxStatus ) {
4531
4540
if (txStatus->Label .TabletID () != Owner->TabletID ()) {
4532
4541
// We want to compact borrowed tx status
4533
4542
compactTxStatus = true ;
4543
+ break ;
4534
4544
}
4535
4545
}
4546
+ if (snapshot->Subset ->TxStatus && snapshot->Subset ->GarbageTransactions ) {
4547
+ // We want to remove garbage transactions
4548
+ compactTxStatus = true ;
4549
+ }
4536
4550
4537
4551
if (compactTxStatus) {
4538
- comp->CommittedTransactions = snapshot->Subset ->CommittedTransactions ;
4539
- comp->RemovedTransactions = snapshot->Subset ->RemovedTransactions ;
4540
4552
comp->Frozen .reserve (snapshot->Subset ->Frozen .size ());
4541
4553
for (auto & memTableSnapshot : snapshot->Subset ->Frozen ) {
4542
4554
comp->Frozen .push_back (memTableSnapshot.MemTable );
4543
4555
}
4544
4556
comp->TxStatus = snapshot->Subset ->TxStatus ;
4557
+ comp->GarbageTransactions = snapshot->Subset ->GarbageTransactions ;
4545
4558
} else {
4546
4559
// We are not compacting tx status, avoid deleting current blobs
4547
4560
snapshot->Subset ->TxStatus .clear ();
0 commit comments