Skip to content

Commit 3ba0b36

Browse files
snauryblinkov
authored andcommitted
Avoid relying on refcount for mediator bucket cleanup (#14806)
1 parent d54b7d9 commit 3ba0b36

File tree

1 file changed

+41
-38
lines changed

1 file changed

+41
-38
lines changed

ydb/core/tx/time_cast/time_cast.cpp

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,12 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
333333
if (bucket.Tablets.empty()) {
334334
// There are no tablets interested in this bucket, avoid unnecessary watches
335335
bucket.WatchSent = false;
336-
bucket.Waiters = { };
336+
if (!bucket.Waiters.empty()) {
337+
bucket.Waiters = { };
338+
}
339+
if (!bucket.TabletWaiters.empty()) {
340+
bucket.TabletWaiters.clear();
341+
}
337342
continue;
338343
}
339344

@@ -628,48 +633,46 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr& ev, co
628633
Y_ABORT_UNLESS(record.GetBucket() < mediator.BucketsSz);
629634
auto& bucket = mediator.Buckets[record.GetBucket()];
630635
const ui64 step = record.GetTimeBarrier();
631-
switch (bucket.SafeStep.RefCount()) {
632-
case 0:
633-
break;
634-
case 1:
636+
bucket.SafeStep->Set(step);
637+
if (bucket.Tablets.empty()) {
638+
if (!bucket.Waiters.empty()) {
635639
bucket.Waiters = { };
636-
bucket.TabletWaiters = { };
637-
[[fallthrough]];
638-
default: {
639-
bucket.SafeStep->Set(step);
640-
THashSet<std::pair<TActorId, ui64>> processed; // a set of processed tablets
641-
while (!bucket.Waiters.empty()) {
642-
const auto& top = bucket.Waiters.top();
643-
if (step < top.PlanStep) {
644-
break;
645-
}
646-
if (processed.insert(std::make_pair(top.Sender, top.TabletId)).second) {
647-
ctx.Send(top.Sender, new TEvMediatorTimecast::TEvNotifyPlanStep(top.TabletId, step));
648-
}
649-
bucket.Waiters.pop();
640+
}
641+
if (!bucket.TabletWaiters.empty()) {
642+
bucket.TabletWaiters.clear();
643+
}
644+
} else {
645+
THashSet<std::pair<TActorId, ui64>> processed; // a set of processed tablets
646+
while (!bucket.Waiters.empty()) {
647+
const auto& top = bucket.Waiters.top();
648+
if (step < top.PlanStep) {
649+
break;
650650
}
651-
while (!bucket.TabletWaiters.empty()) {
652-
const auto& candidate = *bucket.TabletWaiters.begin();
653-
if (step < candidate.PlanStep) {
654-
break;
655-
}
656-
auto it = Tablets.find(candidate.TabletId);
657-
if (it != Tablets.end()) {
658-
auto& tabletInfo = it->second;
659-
while (!tabletInfo.Waiters.empty()) {
660-
const auto& top = *tabletInfo.Waiters.begin();
661-
if (step < top.PlanStep) {
662-
break;
663-
}
664-
if (processed.insert(std::make_pair(top.Sender, top.TabletId)).second) {
665-
ctx.Send(top.Sender, new TEvMediatorTimecast::TEvNotifyPlanStep(top.TabletId, step));
666-
}
667-
tabletInfo.Waiters.erase(tabletInfo.Waiters.begin());
651+
if (processed.insert(std::make_pair(top.Sender, top.TabletId)).second) {
652+
ctx.Send(top.Sender, new TEvMediatorTimecast::TEvNotifyPlanStep(top.TabletId, step));
653+
}
654+
bucket.Waiters.pop();
655+
}
656+
while (!bucket.TabletWaiters.empty()) {
657+
const auto& candidate = *bucket.TabletWaiters.begin();
658+
if (step < candidate.PlanStep) {
659+
break;
660+
}
661+
auto it = Tablets.find(candidate.TabletId);
662+
if (it != Tablets.end()) {
663+
auto& tabletInfo = it->second;
664+
while (!tabletInfo.Waiters.empty()) {
665+
const auto& top = *tabletInfo.Waiters.begin();
666+
if (step < top.PlanStep) {
667+
break;
668668
}
669+
if (processed.insert(std::make_pair(top.Sender, top.TabletId)).second) {
670+
ctx.Send(top.Sender, new TEvMediatorTimecast::TEvNotifyPlanStep(top.TabletId, step));
671+
}
672+
tabletInfo.Waiters.erase(tabletInfo.Waiters.begin());
669673
}
670-
bucket.TabletWaiters.erase(bucket.TabletWaiters.begin());
671674
}
672-
break;
675+
bucket.TabletWaiters.erase(bucket.TabletWaiters.begin());
673676
}
674677
}
675678
for (ui64 coordinatorId : mediator.Coordinators) {

0 commit comments

Comments
 (0)