Skip to content

Commit 5bea5d8

Browse files
authored
Add senders readiness helpers (#9103)
1 parent 409b154 commit 5bea5d8

File tree

1 file changed

+11
-0
lines changed

1 file changed

+11
-0
lines changed

ydb/core/change_exchange/change_sender_common_ops.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ class TBaseChangeSender {
283283

284284
Y_ABORT_UNLESS(sender.Ready);
285285
sender.Ready = false;
286+
ReadySenders--;
286287

287288
sender.Pending.reserve(sender.Prepared.size());
288289
for (const auto& record : sender.Prepared) {
@@ -438,6 +439,7 @@ class TBaseChangeSender {
438439
ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill());
439440
}
440441
}
442+
ReadySenders = 0;
441443
}
442444

443445
void RemoveRecords() {
@@ -531,6 +533,7 @@ class TBaseChangeSender {
531533

532534
auto& sender = it->second;
533535
sender.Ready = true;
536+
ReadySenders++;
534537

535538
if (sender.Pending) {
536539
RemoveRecords(std::exchange(sender.Pending, {}));
@@ -555,6 +558,9 @@ class TBaseChangeSender {
555558
}
556559

557560
ReEnqueueRecords(it->second);
561+
if (it->second.Ready) {
562+
--ReadySenders;
563+
}
558564
Senders.erase(it);
559565
GonePartitions.push_back(partitionId);
560566

@@ -580,6 +586,10 @@ class TBaseChangeSender {
580586
, MemUsage(0)
581587
{}
582588

589+
bool IsAllSendersReady() {
590+
return ReadySenders == Senders.size();
591+
}
592+
583593
void RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
584594
const auto& cgi = ev->Get()->Cgi();
585595
if (const auto& str = cgi.Get("partitionId")) {
@@ -782,6 +792,7 @@ class TBaseChangeSender {
782792
ui64 MemUsage;
783793

784794
THashMap<ui64, TSender> Senders; // ui64 is partition id
795+
ui64 ReadySenders = 0;
785796
TSet<TEnqueuedRecord> Enqueued;
786797
TSet<TIncompleteRecord> PendingBody;
787798
TMap<ui64, typename TChangeRecord::TPtr> PendingSent; // ui64 is order

0 commit comments

Comments
 (0)