Skip to content

Commit bcf8689

Browse files
authored
Add incr restore change sender (#9603)
1 parent 866f44f commit bcf8689

13 files changed

+1091
-631
lines changed

ydb/core/change_exchange/change_sender.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
namespace NKikimr::NChangeExchange {
1111

1212
void TChangeSender::LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId) {
13+
++UninitSenders;
1314
auto res = senders.emplace(partitionId, TSender{});
1415
Y_ABORT_UNLESS(res.second);
1516

@@ -27,6 +28,7 @@ void TChangeSender::RegisterSender(ui64 partitionId) {
2728

2829
Y_ABORT_UNLESS(!sender.ActorId);
2930
sender.ActorId = ActorOps->RegisterWithSameMailbox(SenderFactory->CreateSender(partitionId));
31+
--UninitSenders;
3032
}
3133

3234
void TChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) {

ydb/core/change_exchange/change_sender.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,10 @@ class TChangeSender {
174174
return ReadySenders == Senders.size();
175175
}
176176

177+
inline bool IsAllSendersReadyOrUninit() {
178+
return ReadySenders + UninitSenders == Senders.size();
179+
}
180+
177181
void SetPartitionResolver(IPartitionResolverVisitor* partitionResolver) {
178182
PartitionResolver.Reset(partitionResolver);
179183
}
@@ -194,6 +198,7 @@ class TChangeSender {
194198

195199
THashMap<ui64, TSender> Senders; // ui64 is partition id
196200
ui64 ReadySenders = 0;
201+
ui64 UninitSenders = 0;
197202
TSet<TEnqueuedRecord> Enqueued;
198203
TSet<TIncompleteRecord> PendingBody;
199204
TMap<ui64, IChangeRecord::TPtr> PendingSent; // ui64 is order

ydb/core/tx/datashard/change_exchange_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace NDataShard {
88

99
IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId);
1010
IActor* CreateCdcStreamChangeSender(const TDataShardId& dataShard, const TPathId& streamPathId);
11+
IActor* CreateIncrRestoreChangeSender(const TActorId& changeServerActor, const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& restoreTargetPathId);
1112

1213
} // NDataShard
1314
} // NKikimr

0 commit comments

Comments
 (0)