@@ -333,7 +333,12 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
333
333
if (bucket.Tablets .empty ()) {
334
334
// There are no tablets interested in this bucket, avoid unnecessary watches
335
335
bucket.WatchSent = false ;
336
- bucket.Waiters = { };
336
+ if (!bucket.Waiters .empty ()) {
337
+ bucket.Waiters = { };
338
+ }
339
+ if (!bucket.TabletWaiters .empty ()) {
340
+ bucket.TabletWaiters .clear ();
341
+ }
337
342
continue ;
338
343
}
339
344
@@ -628,48 +633,46 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr& ev, co
628
633
Y_ABORT_UNLESS (record.GetBucket () < mediator.BucketsSz );
629
634
auto & bucket = mediator.Buckets [record.GetBucket ()];
630
635
const ui64 step = record.GetTimeBarrier ();
631
- switch (bucket.SafeStep .RefCount ()) {
632
- case 0 :
633
- break ;
634
- case 1 :
636
+ bucket.SafeStep ->Set (step);
637
+ if (bucket.Tablets .empty ()) {
638
+ if (!bucket.Waiters .empty ()) {
635
639
bucket.Waiters = { };
636
- bucket.TabletWaiters = { };
637
- [[fallthrough]];
638
- default : {
639
- bucket.SafeStep ->Set (step);
640
- THashSet<std::pair<TActorId, ui64>> processed; // a set of processed tablets
641
- while (!bucket.Waiters .empty ()) {
642
- const auto & top = bucket.Waiters .top ();
643
- if (step < top.PlanStep ) {
644
- break ;
645
- }
646
- if (processed.insert (std::make_pair (top.Sender , top.TabletId )).second ) {
647
- ctx.Send (top.Sender , new TEvMediatorTimecast::TEvNotifyPlanStep (top.TabletId , step));
648
- }
649
- bucket.Waiters .pop ();
640
+ }
641
+ if (!bucket.TabletWaiters .empty ()) {
642
+ bucket.TabletWaiters .clear ();
643
+ }
644
+ } else {
645
+ THashSet<std::pair<TActorId, ui64>> processed; // a set of processed tablets
646
+ while (!bucket.Waiters .empty ()) {
647
+ const auto & top = bucket.Waiters .top ();
648
+ if (step < top.PlanStep ) {
649
+ break ;
650
650
}
651
- while (!bucket. TabletWaiters . empty () ) {
652
- const auto & candidate = *bucket. TabletWaiters . begin ( );
653
- if (step < candidate. PlanStep ) {
654
- break ;
655
- }
656
- auto it = Tablets. find (candidate. TabletId );
657
- if (it != Tablets. end ()) {
658
- auto & tabletInfo = it-> second ;
659
- while (!tabletInfo. Waiters . empty ()) {
660
- const auto & top = *tabletInfo. Waiters . begin ();
661
- if (step < top. PlanStep ) {
662
- break ;
663
- }
664
- if (processed. insert ( std::make_pair (top. Sender , top. TabletId )). second ) {
665
- ctx. Send (top. Sender , new TEvMediatorTimecast::TEvNotifyPlanStep (top. TabletId , step) );
666
- }
667
- tabletInfo. Waiters . erase (tabletInfo. Waiters . begin ()) ;
651
+ if (processed. insert ( std::make_pair (top. Sender , top. TabletId )). second ) {
652
+ ctx. Send (top. Sender , new TEvMediatorTimecast::TEvNotifyPlanStep (top. TabletId , step) );
653
+ }
654
+ bucket. Waiters . pop () ;
655
+ }
656
+ while (!bucket. TabletWaiters . empty ()) {
657
+ const auto & candidate = *bucket. TabletWaiters . begin ();
658
+ if (step < candidate. PlanStep ) {
659
+ break ;
660
+ }
661
+ auto it = Tablets. find (candidate. TabletId );
662
+ if (it != Tablets. end ()) {
663
+ auto & tabletInfo = it-> second ;
664
+ while (!tabletInfo. Waiters . empty () ) {
665
+ const auto & top = *tabletInfo. Waiters . begin ( );
666
+ if (step < top. PlanStep ) {
667
+ break ;
668
668
}
669
+ if (processed.insert (std::make_pair (top.Sender , top.TabletId )).second ) {
670
+ ctx.Send (top.Sender , new TEvMediatorTimecast::TEvNotifyPlanStep (top.TabletId , step));
671
+ }
672
+ tabletInfo.Waiters .erase (tabletInfo.Waiters .begin ());
669
673
}
670
- bucket.TabletWaiters .erase (bucket.TabletWaiters .begin ());
671
674
}
672
- break ;
675
+ bucket. TabletWaiters . erase (bucket. TabletWaiters . begin ()) ;
673
676
}
674
677
}
675
678
for (ui64 coordinatorId : mediator.Coordinators ) {
0 commit comments