22
22
namespace NKikimr ::NReplication::NService {
23
23
24
24
class TSessionInfo {
25
+ struct TWorkerInfo {
26
+ const TActorId ActorId;
27
+ TRowVersion Heartbeat;
28
+
29
+ explicit TWorkerInfo (const TActorId& actorId)
30
+ : ActorId(actorId)
31
+ {
32
+ }
33
+
34
+ operator TActorId () const {
35
+ return ActorId;
36
+ }
37
+ };
38
+
25
39
public:
26
40
explicit TSessionInfo (const TActorId& actorId)
27
41
: ActorId(actorId)
@@ -180,7 +194,40 @@ class TSessionInfo {
180
194
}
181
195
182
196
void Handle (IActorOps* ops, TEvService::TEvHeartbeat::TPtr& ev) {
183
- GetWorkerId (ev->Sender ).Serialize (*ev->Get ()->Record .MutableWorker ());
197
+ const auto id = GetWorkerId (ev->Sender );
198
+ if (!Workers.contains (id)) {
199
+ return ;
200
+ }
201
+
202
+ auto & worker = Workers.at (id);
203
+ auto & record = ev->Get ()->Record ;
204
+ const auto version = TRowVersion::FromProto (record.GetVersion ());
205
+
206
+ if (const auto & prevVersion = worker.Heartbeat ) {
207
+ if (version <= prevVersion) {
208
+ return ;
209
+ }
210
+
211
+ auto it = WorkersByHeartbeat.find (prevVersion);
212
+ if (it != WorkersByHeartbeat.end ()) {
213
+ it->second .erase (id);
214
+ if (it->second .empty ()) {
215
+ WorkersByHeartbeat.erase (it);
216
+ }
217
+ }
218
+ }
219
+
220
+ worker.Heartbeat = version;
221
+ WorkersWithHeartbeat.insert (id);
222
+ WorkersByHeartbeat[version].insert (id);
223
+
224
+ if (Workers.size () == WorkersWithHeartbeat.size ()) {
225
+ while (!TxIds.empty () && WorkersByHeartbeat.begin ()->first < TxIds.begin ()->first ) {
226
+ TxIds.erase (TxIds.begin ());
227
+ }
228
+ }
229
+
230
+ id.Serialize (*record.MutableWorker ());
184
231
ops->Send (ActorId, ev->ReleaseBase ().Release (), ev->Flags , ev->Cookie );
185
232
}
186
233
@@ -206,11 +253,13 @@ class TSessionInfo {
206
253
private:
207
254
TActorId ActorId;
208
255
ui64 Generation;
209
- THashMap<TWorkerId, TActorId > Workers;
256
+ THashMap<TWorkerId, TWorkerInfo > Workers;
210
257
THashMap<TActorId, TWorkerId> ActorIdToWorkerId;
211
258
212
259
TMap<TRowVersion, ui64> TxIds;
213
260
TMap<TRowVersion, THashSet<TActorId>> PendingTxId;
261
+ THashSet<TWorkerId> WorkersWithHeartbeat;
262
+ TMap<TRowVersion, THashSet<TWorkerId>> WorkersByHeartbeat;
214
263
215
264
}; // TSessionInfo
216
265
0 commit comments