4
4
namespace NKikimr {
5
5
namespace NHive {
6
6
7
- class TTxUpdateDcFollowers : public TTransactionBase <THive> {
8
- TDataCenterId DataCenterId;
7
+ class TTxProcessUpdateFollowers : public TTransactionBase <THive> {
9
8
TSideEffects SideEffects;
9
+
10
+ static constexpr size_t MAX_UPDATES_PROCESSED = 1000 ;
10
11
public:
11
- TTxUpdateDcFollowers ( const TDataCenterId& dataCenter, THive* hive)
12
+ TTxProcessUpdateFollowers ( THive* hive)
12
13
: TBase(hive)
13
- , DataCenterId(dataCenter)
14
14
{}
15
15
16
16
TTxType GetTxType () const override { return NHive::TXTYPE_UPDATE_DC_FOLLOWERS; }
17
17
18
18
bool Execute (TTransactionContext& txc, const TActorContext&) override {
19
- BLOG_D (" THive::TTxUpdateDcFollowers::Execute(" << DataCenterId << " )" );
20
- SideEffects.Reset (Self->SelfId ());
19
+ BLOG_D (" TTxProcessUpdateFollowers::Execute()" );
21
20
NIceDb::TNiceDb db (txc.DB );
22
- auto & dataCenter = Self->DataCenters [DataCenterId];
23
- if (!dataCenter.UpdateScheduled ) {
24
- return true ;
25
- }
26
- dataCenter.UpdateScheduled = false ;
27
- if (dataCenter.IsRegistered ()) {
28
- for (auto & [tabletId, tablet] : Self->Tablets ) {
29
- for (auto & group : tablet.FollowerGroups ) {
30
- auto & followers = dataCenter.Followers [{tabletId, group.Id }];
31
- auto neededCount = group.GetFollowerCountForDataCenter (DataCenterId);
32
- while (followers.size () < neededCount) {
33
- TFollowerTabletInfo& follower = tablet.AddFollower (group);
34
- follower.NodeFilter .AllowedDataCenters = {DataCenterId};
35
- follower.Statistics .SetLastAliveTimestamp (TlsActivationContext->Now ().MilliSeconds ());
36
- db.Table <Schema::TabletFollowerTablet>().Key (tabletId, follower.Id ).Update (
37
- NIceDb::TUpdate<Schema::TabletFollowerTablet::GroupID>(follower.FollowerGroup .Id ),
38
- NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(0 ),
39
- NIceDb::TUpdate<Schema::TabletFollowerTablet::Statistics>(follower.Statistics ),
40
- NIceDb::TUpdate<Schema::TabletFollowerTablet::DataCenter>(DataCenterId));
41
- follower.InitTabletMetrics ();
42
- follower.BecomeStopped ();
43
- follower.InitiateBoot ();
44
- followers.push_back (std::prev (tablet.Followers .end ()));
45
- BLOG_D (" THive::TTxUpdateDcFollowers::Execute(" << DataCenterId << " ): created follower " << follower.GetFullTabletId ());
21
+ SideEffects.Reset (Self->SelfId ());
22
+ for (size_t i = 0 ; !Self->PendingFollowerUpdates .Empty () && i < MAX_UPDATES_PROCESSED; ++i) {
23
+ auto op = Self->PendingFollowerUpdates .Pop ();
24
+ TTabletInfo* tablet = Self->FindTablet (op.TabletId );
25
+ auto & dc = Self->DataCenters [op.DataCenter ];
26
+ if (tablet == nullptr ) {
27
+ continue ;
28
+ }
29
+ switch (op.Action ) {
30
+ case TFollowerUpdates::EAction::Create:
31
+ {
32
+ if (!dc.IsRegistered ()) {
33
+ continue ;
34
+ }
35
+ TFollowerGroup& group = tablet->AsLeader ().GetFollowerGroup (op.GroupId );
36
+ auto & followers = dc.Followers [{op.TabletId .first , op.GroupId }];
37
+ if (group.GetFollowerCountForDataCenter (op.DataCenter ) <= followers.size ()) {
38
+ continue ;
46
39
}
40
+ TFollowerTabletInfo& follower = tablet->AsLeader ().AddFollower (group);
41
+ follower.NodeFilter .AllowedDataCenters = {op.DataCenter };
42
+ follower.Statistics .SetLastAliveTimestamp (TlsActivationContext->Now ().MilliSeconds ());
43
+ db.Table <Schema::TabletFollowerTablet>().Key (op.TabletId .first , follower.Id ).Update (
44
+ NIceDb::TUpdate<Schema::TabletFollowerTablet::GroupID>(follower.FollowerGroup .Id ),
45
+ NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(0 ),
46
+ NIceDb::TUpdate<Schema::TabletFollowerTablet::Statistics>(follower.Statistics ),
47
+ NIceDb::TUpdate<Schema::TabletFollowerTablet::DataCenter>(op.DataCenter ));
48
+ follower.InitTabletMetrics ();
49
+ follower.BecomeStopped ();
50
+ follower.InitiateBoot ();
51
+ followers.push_back (std::prev (tablet->AsLeader ().Followers .end ()));
52
+ BLOG_D (" THive::TTxProcessUpdateFollowers::Execute(): created follower " << follower.GetFullTabletId ());
53
+ break ;
47
54
}
48
- }
49
- } else {
50
- // deleting followers
51
- i64 deletedFollowers = 0 ;
52
- for (auto & [_, followers] : dataCenter.Followers ) {
53
- for (auto follower : followers) {
54
- db.Table <Schema::TabletFollowerTablet>().Key (follower->GetFullTabletId ()).Delete ();
55
- db.Table <Schema::Metrics>().Key (follower->GetFullTabletId ()).Delete ();
56
- follower->InitiateStop (SideEffects);
57
- auto & leader = follower->GetLeader ();
58
- leader.Followers .erase (follower);
59
- ++deletedFollowers;
55
+ case TFollowerUpdates::EAction::Update:
56
+ {
57
+ // This is updated in memory in LoadEverything
58
+ bool exists = db.Table <Schema::TabletFollowerTablet>().Key (op.TabletId ).Select ().IsValid ();
59
+ Y_ABORT_UNLESS (exists, " %s" , (TStringBuilder () << " trying to update tablet " << op.TabletId ).data ());
60
+ db.Table <Schema::TabletFollowerTablet>().Key (op.TabletId ).Update <Schema::TabletFollowerTablet::DataCenter>(op.DataCenter );
61
+ break ;
62
+ }
63
+ case TFollowerUpdates::EAction::Delete:
64
+ {
65
+ if (dc.IsRegistered ()) {
66
+ continue ;
67
+ }
68
+ db.Table <Schema::TabletFollowerTablet>().Key (op.TabletId ).Delete ();
69
+ db.Table <Schema::Metrics>().Key (op.TabletId ).Delete ();
70
+ tablet->InitiateStop (SideEffects);
71
+ auto & followers = dc.Followers [{op.TabletId .first , op.GroupId }]; // Note: there are at most 3 followers here, see TPartitionConfigMerger
72
+ auto iter = std::find_if (followers.begin (), followers.end (), [tabletId = op.TabletId ](const auto & fw) {
73
+ return fw->GetFullTabletId () == tabletId;
74
+ });
75
+ Y_ABORT_UNLESS (iter != followers.end ());
76
+ auto & leader = tablet->GetLeader ();
77
+ leader.Followers .erase (*iter);
78
+ followers.erase (iter);
79
+ Self->UpdateCounterTabletsTotal (-1 );
80
+ break ;
60
81
}
61
82
}
62
- BLOG_D (" THive::TTxUpdateDcFollowers::Execute(" << DataCenterId << " ): deleted " << deletedFollowers << " followers" );
63
- Self->UpdateCounterTabletsTotal (-deletedFollowers);
64
- dataCenter.Followers .clear ();
83
+ }
84
+ if (Self->PendingFollowerUpdates .Empty ()) {
85
+ Self->ProcessFollowerUpdatesScheduled = false ;
86
+ } else {
87
+ SideEffects.Send (Self->SelfId (), new TEvPrivate::TEvUpdateFollowers);
65
88
}
66
89
return true ;
67
90
}
@@ -71,8 +94,8 @@ class TTxUpdateDcFollowers : public TTransactionBase<THive> {
71
94
}
72
95
};
73
96
74
- ITransaction* THive::CreateUpdateDcFollowers ( const TDataCenterId& dc ) {
75
- return new TTxUpdateDcFollowers (dc, this );
97
+ ITransaction* THive::CreateProcessUpdateFollowers ( ) {
98
+ return new TTxProcessUpdateFollowers ( this );
76
99
}
77
100
78
101
} // NHive
0 commit comments