Skip to content

Commit af95436

Browse files
authored
stable-25-1: Fixes for data erasure manager (#20352)
2 parents 95c30d4 + f86fe63 commit af95436

11 files changed

+154
-30
lines changed

ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class TDataErasureManager {
5454
virtual bool Remove(const TPathId& pathId) = 0;
5555
virtual bool Remove(const TShardIdx& shardIdx) = 0;
5656
virtual void HandleNewPartitioning(const std::vector<TShardIdx>& dataErasureShards, NIceDb::TNiceDb& db) = 0;
57+
virtual void SyncBscGeneration(NIceDb::TNiceDb& db, ui64 currentBscGeneration) = 0;
5758

5859
void Clear();
5960

@@ -128,6 +129,7 @@ using TQueue = NOperationQueue::TOperationQueueWithTimer<
128129
bool Remove(const TPathId& pathId) override;
129130
bool Remove(const TShardIdx& shardIdx) override;
130131
void HandleNewPartitioning(const std::vector<TShardIdx>& dataErasureShards, NIceDb::TNiceDb& db) override;
132+
void SyncBscGeneration(NIceDb::TNiceDb& db, ui64 currentBscGeneration) override;
131133

132134
private:
133135
static TQueue::TConfig ConvertConfig(const NKikimrConfig::TDataErasureConfig& config);
@@ -189,6 +191,7 @@ using TQueue = NOperationQueue::TOperationQueueWithTimer<
189191
bool Remove(const TPathId& pathId) override;
190192
bool Remove(const TShardIdx& shardIdx) override;
191193
void HandleNewPartitioning(const std::vector<TShardIdx>& dataErasureShards, NIceDb::TNiceDb& db) override;
194+
void SyncBscGeneration(NIceDb::TNiceDb& db, ui64 currentBscGeneration) override;
192195

193196
private:
194197
static TQueue::TConfig ConvertConfig(const NKikimrConfig::TDataErasureConfig& config);

ydb/core/tx/schemeshard/schemeshard__operation_common.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "schemeshard__operation_common.h"
2+
#include "schemeshard__data_erasure_manager.h"
23

34
#include <ydb/core/blob_depot/events.h>
45
#include <ydb/core/blockstore/core/blockstore.h>
@@ -928,8 +929,16 @@ void UpdatePartitioningForCopyTable(TOperationId operationId, TTxState &txState,
928929
TShardInfo datashardInfo = TShardInfo::DataShardInfo(operationId.GetTxId(), txState.TargetPathId);
929930
datashardInfo.BindedChannels = channelsBinding;
930931

931-
context.SS->SetPartitioning(txState.TargetPathId, dstTableInfo,
932-
ApplyPartitioningCopyTable(datashardInfo, srcTableInfo, txState, context.SS));
932+
auto newPartitioning = ApplyPartitioningCopyTable(datashardInfo, srcTableInfo, txState, context.SS);
933+
TVector<TShardIdx> newShardsIdx;
934+
newShardsIdx.reserve(newPartitioning.size());
935+
for (const auto& part : newPartitioning) {
936+
newShardsIdx.push_back(part.ShardIdx);
937+
}
938+
context.SS->SetPartitioning(txState.TargetPathId, dstTableInfo, std::move(newPartitioning));
939+
if (context.SS->EnableDataErasure && context.SS->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
940+
context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToDataErasure(std::move(newShardsIdx)));
941+
}
933942

934943
ui32 newShardCout = dstTableInfo->GetPartitions().size();
935944

ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "schemeshard__operation_part.h"
22
#include "schemeshard__operation_common.h"
3+
#include "schemeshard__data_erasure_manager.h"
34
#include "schemeshard_impl.h"
45
#include "schemeshard_tx_infly.h"
56
#include "schemeshard_cdc_stream_common.h"
@@ -670,11 +671,17 @@ class TCopyTable: public TSubOperation {
670671
TShardInfo datashardInfo = TShardInfo::DataShardInfo(OperationId.GetTxId(), newTable->PathId);
671672
datashardInfo.BindedChannels = channelsBinding;
672673
auto newPartition = NTableState::ApplyPartitioningCopyTable(datashardInfo, srcTableInfo, txState, context.SS);
674+
TVector<TShardIdx> newShardsIdx;
675+
newShardsIdx.reserve(newPartition.size());
673676
for (const auto& part: newPartition) {
674677
context.MemChanges.GrabNewShard(context.SS, part.ShardIdx);
675678
context.DbChanges.PersistShard(part.ShardIdx);
679+
newShardsIdx.push_back(part.ShardIdx);
676680
}
677681
context.SS->SetPartitioning(newTable->PathId, tableInfo, std::move(newPartition));
682+
if (context.SS->EnableDataErasure && context.SS->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
683+
context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToDataErasure(std::move(newShardsIdx)));
684+
}
678685
for (const auto& shard : tableInfo->GetPartitions()) {
679686
Y_ABORT_UNLESS(context.SS->ShardInfos.contains(shard.ShardIdx), "shard info is set before");
680687
if (storePerShardConfig) {

ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "schemeshard__operation_part.h"
22
#include "schemeshard__operation_common.h"
3+
#include "schemeshard__data_erasure_manager.h"
34
#include "schemeshard_impl.h"
45

56
#include <ydb/core/base/subdomain.h>
@@ -244,6 +245,7 @@ class TTransferData: public TSubOperationState {
244245

245246
// Replace all Src datashard(s) with Dst datashard(s)
246247
TVector<TTableShardInfo> newPartitioning;
248+
TVector<TShardIdx> newShardsIdx;
247249
THashSet<TShardIdx> allSrcShardIdxs;
248250
for (const auto& txShard : txState->Shards) {
249251
if (txShard.Operation == TTxState::TransferData)
@@ -280,6 +282,7 @@ class TTransferData: public TSubOperationState {
280282
}
281283

282284
newPartitioning.push_back(dst);
285+
newShardsIdx.push_back(dst.ShardIdx);
283286
}
284287

285288
dstAdded = true;
@@ -293,6 +296,9 @@ class TTransferData: public TSubOperationState {
293296
// Delete the whole old partitioning and persist the whole new partitioning as the indexes have changed
294297
context.SS->PersistTablePartitioningDeletion(db, tableId, tableInfo);
295298
context.SS->SetPartitioning(tableId, tableInfo, std::move(newPartitioning));
299+
if (context.SS->EnableDataErasure && context.SS->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
300+
context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToDataErasure(std::move(newShardsIdx)));
301+
}
296302
context.SS->PersistTablePartitioning(db, tableId, tableInfo);
297303
context.SS->PersistTablePartitionStats(db, tableId, tableInfo);
298304

ydb/core/tx/schemeshard/schemeshard__root_data_erasure_manager.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ void TRootDataErasureManager::Run(NIceDb::TNiceDb& db) {
131131
Status = EDataErasureStatus::IN_PROGRESS_BSC;
132132
}
133133
db.Table<Schema::DataErasureGenerations>().Key(Generation).Update<Schema::DataErasureGenerations::Status,
134-
Schema::DataErasureGenerations::StartTime>(Status, StartTime.MicroSeconds());
134+
Schema::DataErasureGenerations::StartTime>(Status, StartTime.MicroSeconds());
135135

136136
const auto ctx = SchemeShard->ActorContext();
137137
LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
@@ -481,6 +481,13 @@ void TRootDataErasureManager::HandleNewPartitioning(const std::vector<TShardIdx>
481481
LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [HandleNewPartitioning] Cannot execute in root schemeshard: " << SchemeShard->TabletID());
482482
}
483483

484+
void TRootDataErasureManager::SyncBscGeneration(NIceDb::TNiceDb& db, ui64 currentBscGeneration) {
485+
db.Table<Schema::DataErasureGenerations>().Key(GetGeneration()).Delete();
486+
SetGeneration(currentBscGeneration + 1);
487+
db.Table<Schema::DataErasureGenerations>().Key(GetGeneration()).Update<Schema::DataErasureGenerations::Status,
488+
Schema::DataErasureGenerations::StartTime>(GetStatus(), StartTime.MicroSeconds());
489+
}
490+
484491
void TRootDataErasureManager::UpdateMetrics() {
485492
SchemeShard->TabletCounters->Simple()[COUNTER_DATA_ERASURE_QUEUE_SIZE].Set(Queue->Size());
486493
SchemeShard->TabletCounters->Simple()[COUNTER_DATA_ERASURE_QUEUE_RUNNING].Set(Queue->RunningSize());
@@ -633,13 +640,15 @@ struct TSchemeShard::TTxCompleteDataErasureBSC : public TSchemeShard::TRwTxBase
633640

634641
const auto& record = Ev->Get()->Record;
635642
auto& manager = Self->DataErasureManager;
636-
if (record.GetCurrentGeneration() != manager->GetGeneration()) {
643+
NIceDb::TNiceDb db(txc.DB);
644+
if (ui64 currentBscGeneration = record.GetCurrentGeneration(); currentBscGeneration > manager->GetGeneration()) {
637645
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
638-
"TTxCompleteDataErasureBSC Unknown generation#" << record.GetCurrentGeneration() << ", Expected gen# " << manager->GetGeneration() << " at schemestard: " << Self->TabletID());
646+
"TTxCompleteDataErasureBSC Unknown generation#" << currentBscGeneration << ", Expected gen# " << manager->GetGeneration() << " at schemestard: " << Self->TabletID());
647+
manager->SyncBscGeneration(db, currentBscGeneration);
648+
manager->SendRequestToBSC();
639649
return;
640650
}
641651

642-
NIceDb::TNiceDb db(txc.DB);
643652
if (record.GetCompleted()) {
644653
LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxCompleteDataErasureBSC: Data shred in BSC is completed");
645654
manager->Complete();

ydb/core/tx/schemeshard/schemeshard__tenant_data_erasure_manager.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,12 @@ void TTenantDataErasureManager::HandleNewPartitioning(const std::vector<TShardId
445445
<< ", Status# " << static_cast<ui32>(Status));
446446
}
447447

448+
void TTenantDataErasureManager::SyncBscGeneration(NIceDb::TNiceDb&, ui64) {
449+
auto ctx = SchemeShard->ActorContext();
450+
LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
451+
"[TenantDataErasureManager] [SyncBscGeneration] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID());
452+
}
453+
448454
void TTenantDataErasureManager::UpdateMetrics() {
449455
SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_DATA_ERASURE_QUEUE_SIZE].Set(Queue->Size());
450456
SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_DATA_ERASURE_QUEUE_RUNNING].Set(Queue->RunningSize());
@@ -625,13 +631,13 @@ NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureShar
625631
template NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureShard<TEvDataShard::TEvForceDataCleanupResult::TPtr>(TEvDataShard::TEvForceDataCleanupResult::TPtr& ev);
626632
template NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureShard<TEvKeyValue::TEvCleanUpDataResponse::TPtr>(TEvKeyValue::TEvCleanUpDataResponse::TPtr& ev);
627633

628-
struct TSchemeShard::TTxAddEntryToDataErasure : public TSchemeShard::TRwTxBase {
629-
const std::vector<TShardIdx> DataErasureShards;
634+
struct TSchemeShard::TTxAddNewShardToDataErasure : public TSchemeShard::TRwTxBase {
635+
TEvPrivate::TEvAddNewShardToDataErasure::TPtr Ev;
630636
bool NeedResponseComplete = false;
631637

632-
TTxAddEntryToDataErasure(TSelf *self, const std::vector<TShardIdx>& dataErasureShards)
638+
TTxAddNewShardToDataErasure(TSelf *self, TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev)
633639
: TRwTxBase(self)
634-
, DataErasureShards(std::move(dataErasureShards))
640+
, Ev(std::move(ev))
635641
{}
636642

637643
TTxType GetTxType() const override { return TXTYPE_ADD_SHARDS_DATA_ERASURE; }
@@ -641,7 +647,7 @@ struct TSchemeShard::TTxAddEntryToDataErasure : public TSchemeShard::TRwTxBase {
641647
"TTxAddEntryToDataErasure Execute at schemestard: " << Self->TabletID());
642648

643649
NIceDb::TNiceDb db(txc.DB);
644-
Self->DataErasureManager->HandleNewPartitioning(DataErasureShards, db);
650+
Self->DataErasureManager->HandleNewPartitioning(Ev->Get()->Shards, db);
645651
if (Self->DataErasureManager->GetStatus() == EDataErasureStatus::COMPLETED) {
646652
if (Self->DataErasureManager->IsRunning()) {
647653
NeedResponseComplete = true;
@@ -658,8 +664,8 @@ struct TSchemeShard::TTxAddEntryToDataErasure : public TSchemeShard::TRwTxBase {
658664
}
659665
};
660666

661-
NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxAddEntryToDataErasure(const std::vector<TShardIdx>& dataErasureShards) {
662-
return new TTxAddEntryToDataErasure(this, dataErasureShards);
667+
NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxAddNewShardToDataErasure(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev) {
668+
return new TTxAddNewShardToDataErasure(this, ev);
663669
}
664670

665671
struct TSchemeShard::TTxCancelDataErasureShards : public TSchemeShard::TRwTxBase {

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4978,6 +4978,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
49784978
HFuncTraced(TEvSchemeShard::TEvTenantDataErasureRequest, Handle);
49794979
HFuncTraced(TEvDataShard::TEvForceDataCleanupResult, Handle);
49804980
HFuncTraced(TEvKeyValue::TEvCleanUpDataResponse, Handle);
4981+
HFuncTraced(TEvPrivate::TEvAddNewShardToDataErasure, Handle);
49814982
HFuncTraced(TEvSchemeShard::TEvTenantDataErasureResponse, Handle);
49824983
HFuncTraced(TEvSchemeShard::TEvDataErasureInfoRequest, Handle);
49834984
HFuncTraced(TEvSchemeShard::TEvDataErasureManualStartupRequest, Handle);
@@ -7116,9 +7117,6 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T
71167117
newDataErasureShards.push_back(p.ShardIdx);
71177118
}
71187119
}
7119-
if (EnableDataErasure && DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
7120-
Execute(CreateTxAddEntryToDataErasure(newDataErasureShards), this->ActorContext());
7121-
}
71227120

71237121
for (const auto& p: oldPartitioning) {
71247122
if (!newPartitioningSet.contains(p.ShardIdx)) {
@@ -7682,6 +7680,10 @@ void TSchemeShard::Handle(TEvKeyValue::TEvCleanUpDataResponse::TPtr& ev, const T
76827680
Execute(this->CreateTxCompleteDataErasureShard(ev), ctx);
76837681
}
76847682

7683+
void TSchemeShard::Handle(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev, const TActorContext& ctx) {
7684+
Execute(CreateTxAddNewShardToDataErasure(ev), ctx);
7685+
}
7686+
76857687
void TSchemeShard::Handle(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev, const TActorContext& ctx) {
76867688
Execute(CreateTxCompleteDataErasureTenant(ev), ctx);
76877689
}

ydb/core/tx/schemeshard/schemeshard_impl.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,8 +1048,8 @@ class TSchemeShard
10481048
struct TTxRunDataErasure;
10491049
NTabletFlatExecutor::ITransaction* CreateTxRunDataErasure(bool isNewDataErasure);
10501050

1051-
struct TTxAddEntryToDataErasure;
1052-
NTabletFlatExecutor::ITransaction* CreateTxAddEntryToDataErasure(const std::vector<TShardIdx>& dataErasureShards);
1051+
struct TTxAddNewShardToDataErasure;
1052+
NTabletFlatExecutor::ITransaction* CreateTxAddNewShardToDataErasure(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev);
10531053

10541054
struct TTxCancelDataErasureShards;
10551055
NTabletFlatExecutor::ITransaction* CreateTxCancelDataErasureShards(const std::vector<TShardIdx>& oldShards);
@@ -1179,6 +1179,7 @@ class TSchemeShard
11791179
void Handle(TEvDataShard::TEvForceDataCleanupResult::TPtr& ev, const TActorContext& ctx);
11801180
void Handle(TEvKeyValue::TEvCleanUpDataResponse__HandlePtr& ev, const TActorContext& ctx);
11811181
void Handle(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev, const TActorContext& ctx);
1182+
void Handle(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev, const TActorContext& ctx);
11821183
void Handle(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev, const TActorContext& ctx);
11831184
void Handle(TEvSchemeShard::TEvDataErasureInfoRequest::TPtr& ev, const TActorContext& ctx);
11841185
void Handle(TEvSchemeShard::TEvDataErasureManualStartupRequest::TPtr& ev, const TActorContext& ctx);

ydb/core/tx/schemeshard/schemeshard_private.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ namespace TEvPrivate {
4040
EvRetryNodeSubscribe,
4141
EvRunDataErasure,
4242
EvRunTenantDataErasure,
43+
EvAddNewShardToDataErasure,
4344
EvEnd
4445
};
4546

@@ -238,6 +239,14 @@ namespace TEvPrivate {
238239
: NodeId(nodeId)
239240
{ }
240241
};
242+
243+
struct TEvAddNewShardToDataErasure : public TEventLocal<TEvAddNewShardToDataErasure, EvAddNewShardToDataErasure> {
244+
const std::vector<TShardIdx> Shards;
245+
246+
TEvAddNewShardToDataErasure(std::vector<TShardIdx>&& shards)
247+
: Shards(std::move(shards))
248+
{}
249+
};
241250
}; // TEvPrivate
242251

243252
} // NSchemeShard

0 commit comments

Comments
 (0)