Skip to content

Commit b94182d

Browse files
committed
Remove adding new shards for data erasure from SetPartitioning (#19324)
1 parent 27303ec commit b94182d

7 files changed

+48
-14
lines changed

ydb/core/tx/schemeshard/schemeshard__operation_common.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "schemeshard__operation_common.h"
2+
#include "schemeshard__data_erasure_manager.h"
23

34
#include <ydb/core/blob_depot/events.h>
45
#include <ydb/core/blockstore/core/blockstore.h>
@@ -928,8 +929,16 @@ void UpdatePartitioningForCopyTable(TOperationId operationId, TTxState &txState,
928929
TShardInfo datashardInfo = TShardInfo::DataShardInfo(operationId.GetTxId(), txState.TargetPathId);
929930
datashardInfo.BindedChannels = channelsBinding;
930931

931-
context.SS->SetPartitioning(txState.TargetPathId, dstTableInfo,
932-
ApplyPartitioningCopyTable(datashardInfo, srcTableInfo, txState, context.SS));
932+
auto newPartitioning = ApplyPartitioningCopyTable(datashardInfo, srcTableInfo, txState, context.SS);
933+
TVector<TShardIdx> newShardsIdx;
934+
newShardsIdx.reserve(newPartitioning.size());
935+
for (const auto& part : newPartitioning) {
936+
newShardsIdx.push_back(part.ShardIdx);
937+
}
938+
context.SS->SetPartitioning(txState.TargetPathId, dstTableInfo, std::move(newPartitioning));
939+
if (context.SS->EnableDataErasure && context.SS->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
940+
context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToDataErasure(std::move(newShardsIdx)));
941+
}
933942

934943
ui32 newShardCout = dstTableInfo->GetPartitions().size();
935944

ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "schemeshard__operation_part.h"
22
#include "schemeshard__operation_common.h"
3+
#include "schemeshard__data_erasure_manager.h"
34
#include "schemeshard_impl.h"
45
#include "schemeshard_tx_infly.h"
56
#include "schemeshard_cdc_stream_common.h"
@@ -670,11 +671,17 @@ class TCopyTable: public TSubOperation {
670671
TShardInfo datashardInfo = TShardInfo::DataShardInfo(OperationId.GetTxId(), newTable->PathId);
671672
datashardInfo.BindedChannels = channelsBinding;
672673
auto newPartition = NTableState::ApplyPartitioningCopyTable(datashardInfo, srcTableInfo, txState, context.SS);
674+
TVector<TShardIdx> newShardsIdx;
675+
newShardsIdx.reserve(newPartition.size());
673676
for (const auto& part: newPartition) {
674677
context.MemChanges.GrabNewShard(context.SS, part.ShardIdx);
675678
context.DbChanges.PersistShard(part.ShardIdx);
679+
newShardsIdx.push_back(part.ShardIdx);
676680
}
677681
context.SS->SetPartitioning(newTable->PathId, tableInfo, std::move(newPartition));
682+
if (context.SS->EnableDataErasure && context.SS->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
683+
context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToDataErasure(std::move(newShardsIdx)));
684+
}
678685
for (const auto& shard : tableInfo->GetPartitions()) {
679686
Y_ABORT_UNLESS(context.SS->ShardInfos.contains(shard.ShardIdx), "shard info is set before");
680687
if (storePerShardConfig) {

ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "schemeshard__operation_part.h"
22
#include "schemeshard__operation_common.h"
3+
#include "schemeshard__data_erasure_manager.h"
34
#include "schemeshard_impl.h"
45

56
#include <ydb/core/base/subdomain.h>
@@ -244,6 +245,7 @@ class TTransferData: public TSubOperationState {
244245

245246
// Replace all Src datashard(s) with Dst datashard(s)
246247
TVector<TTableShardInfo> newPartitioning;
248+
TVector<TShardIdx> newShardsIdx;
247249
THashSet<TShardIdx> allSrcShardIdxs;
248250
for (const auto& txShard : txState->Shards) {
249251
if (txShard.Operation == TTxState::TransferData)
@@ -280,6 +282,7 @@ class TTransferData: public TSubOperationState {
280282
}
281283

282284
newPartitioning.push_back(dst);
285+
newShardsIdx.push_back(dst.ShardIdx);
283286
}
284287

285288
dstAdded = true;
@@ -293,6 +296,9 @@ class TTransferData: public TSubOperationState {
293296
// Delete the whole old partitioning and persist the whole new partitioning as the indexes have changed
294297
context.SS->PersistTablePartitioningDeletion(db, tableId, tableInfo);
295298
context.SS->SetPartitioning(tableId, tableInfo, std::move(newPartitioning));
299+
if (context.SS->EnableDataErasure && context.SS->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
300+
context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToDataErasure(std::move(newShardsIdx)));
301+
}
296302
context.SS->PersistTablePartitioning(db, tableId, tableInfo);
297303
context.SS->PersistTablePartitionStats(db, tableId, tableInfo);
298304

ydb/core/tx/schemeshard/schemeshard__tenant_data_erasure_manager.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -631,13 +631,13 @@ NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureShar
631631
template NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureShard<TEvDataShard::TEvForceDataCleanupResult::TPtr>(TEvDataShard::TEvForceDataCleanupResult::TPtr& ev);
632632
template NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureShard<TEvKeyValue::TEvCleanUpDataResponse::TPtr>(TEvKeyValue::TEvCleanUpDataResponse::TPtr& ev);
633633

634-
struct TSchemeShard::TTxAddEntryToDataErasure : public TSchemeShard::TRwTxBase {
635-
const std::vector<TShardIdx> DataErasureShards;
634+
struct TSchemeShard::TTxAddNewShardToDataErasure : public TSchemeShard::TRwTxBase {
635+
TEvPrivate::TEvAddNewShardToDataErasure::TPtr Ev;
636636
bool NeedResponseComplete = false;
637637

638-
TTxAddEntryToDataErasure(TSelf *self, const std::vector<TShardIdx>& dataErasureShards)
638+
TTxAddNewShardToDataErasure(TSelf *self, TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev)
639639
: TRwTxBase(self)
640-
, DataErasureShards(std::move(dataErasureShards))
640+
, Ev(std::move(ev))
641641
{}
642642

643643
TTxType GetTxType() const override { return TXTYPE_ADD_SHARDS_DATA_ERASURE; }
@@ -647,7 +647,7 @@ struct TSchemeShard::TTxAddEntryToDataErasure : public TSchemeShard::TRwTxBase {
647647
"TTxAddEntryToDataErasure Execute at schemestard: " << Self->TabletID());
648648

649649
NIceDb::TNiceDb db(txc.DB);
650-
Self->DataErasureManager->HandleNewPartitioning(DataErasureShards, db);
650+
Self->DataErasureManager->HandleNewPartitioning(Ev->Get()->Shards, db);
651651
if (Self->DataErasureManager->GetStatus() == EDataErasureStatus::COMPLETED) {
652652
if (Self->DataErasureManager->IsRunning()) {
653653
NeedResponseComplete = true;
@@ -664,8 +664,8 @@ struct TSchemeShard::TTxAddEntryToDataErasure : public TSchemeShard::TRwTxBase {
664664
}
665665
};
666666

667-
NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxAddEntryToDataErasure(const std::vector<TShardIdx>& dataErasureShards) {
668-
return new TTxAddEntryToDataErasure(this, dataErasureShards);
667+
NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxAddNewShardToDataErasure(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev) {
668+
return new TTxAddNewShardToDataErasure(this, ev);
669669
}
670670

671671
struct TSchemeShard::TTxCancelDataErasureShards : public TSchemeShard::TRwTxBase {

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4977,6 +4977,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
49774977
HFuncTraced(TEvSchemeShard::TEvTenantDataErasureRequest, Handle);
49784978
HFuncTraced(TEvDataShard::TEvForceDataCleanupResult, Handle);
49794979
HFuncTraced(TEvKeyValue::TEvCleanUpDataResponse, Handle);
4980+
HFuncTraced(TEvPrivate::TEvAddNewShardToDataErasure, Handle);
49804981
HFuncTraced(TEvSchemeShard::TEvTenantDataErasureResponse, Handle);
49814982
HFuncTraced(TEvSchemeShard::TEvDataErasureInfoRequest, Handle);
49824983
HFuncTraced(TEvSchemeShard::TEvDataErasureManualStartupRequest, Handle);
@@ -7115,9 +7116,6 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T
71157116
newDataErasureShards.push_back(p.ShardIdx);
71167117
}
71177118
}
7118-
if (EnableDataErasure && DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
7119-
Execute(CreateTxAddEntryToDataErasure(newDataErasureShards), this->ActorContext());
7120-
}
71217119

71227120
for (const auto& p: oldPartitioning) {
71237121
if (!newPartitioningSet.contains(p.ShardIdx)) {
@@ -7678,6 +7676,10 @@ void TSchemeShard::Handle(TEvKeyValue::TEvCleanUpDataResponse::TPtr& ev, const T
76787676
Execute(this->CreateTxCompleteDataErasureShard(ev), ctx);
76797677
}
76807678

7679+
void TSchemeShard::Handle(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev, const TActorContext& ctx) {
7680+
Execute(CreateTxAddNewShardToDataErasure(ev), ctx);
7681+
}
7682+
76817683
void TSchemeShard::Handle(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev, const TActorContext& ctx) {
76827684
Execute(CreateTxCompleteDataErasureTenant(ev), ctx);
76837685
}

ydb/core/tx/schemeshard/schemeshard_impl.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,8 +1048,8 @@ class TSchemeShard
10481048
struct TTxRunDataErasure;
10491049
NTabletFlatExecutor::ITransaction* CreateTxRunDataErasure(bool isNewDataErasure);
10501050

1051-
struct TTxAddEntryToDataErasure;
1052-
NTabletFlatExecutor::ITransaction* CreateTxAddEntryToDataErasure(const std::vector<TShardIdx>& dataErasureShards);
1051+
struct TTxAddNewShardToDataErasure;
1052+
NTabletFlatExecutor::ITransaction* CreateTxAddNewShardToDataErasure(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev);
10531053

10541054
struct TTxCancelDataErasureShards;
10551055
NTabletFlatExecutor::ITransaction* CreateTxCancelDataErasureShards(const std::vector<TShardIdx>& oldShards);
@@ -1179,6 +1179,7 @@ class TSchemeShard
11791179
void Handle(TEvDataShard::TEvForceDataCleanupResult::TPtr& ev, const TActorContext& ctx);
11801180
void Handle(TEvKeyValue::TEvCleanUpDataResponse__HandlePtr& ev, const TActorContext& ctx);
11811181
void Handle(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev, const TActorContext& ctx);
1182+
void Handle(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev, const TActorContext& ctx);
11821183
void Handle(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev, const TActorContext& ctx);
11831184
void Handle(TEvSchemeShard::TEvDataErasureInfoRequest::TPtr& ev, const TActorContext& ctx);
11841185
void Handle(TEvSchemeShard::TEvDataErasureManualStartupRequest::TPtr& ev, const TActorContext& ctx);

ydb/core/tx/schemeshard/schemeshard_private.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ namespace TEvPrivate {
4040
EvRetryNodeSubscribe,
4141
EvRunDataErasure,
4242
EvRunTenantDataErasure,
43+
EvAddNewShardToDataErasure,
4344
EvEnd
4445
};
4546

@@ -238,6 +239,14 @@ namespace TEvPrivate {
238239
: NodeId(nodeId)
239240
{ }
240241
};
242+
243+
struct TEvAddNewShardToDataErasure : public TEventLocal<TEvAddNewShardToDataErasure, EvAddNewShardToDataErasure> {
244+
const std::vector<TShardIdx> Shards;
245+
246+
TEvAddNewShardToDataErasure(std::vector<TShardIdx>&& shards)
247+
: Shards(std::move(shards))
248+
{}
249+
};
241250
}; // TEvPrivate
242251

243252
} // NSchemeShard

0 commit comments

Comments
 (0)