Skip to content

Commit ed3bb6f

Browse files
authored
New granular timecast protocol for mediator (#7986)
1 parent 828c2ab commit ed3bb6f

File tree

10 files changed

+2034
-254
lines changed

10 files changed

+2034
-254
lines changed

ydb/core/protos/tx_mediator_timecast.proto

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,57 @@ message TEvWatch {
1010
message TEvUpdate {
1111
optional uint64 Mediator = 1;
1212
optional uint32 Bucket = 2;
13+
14+
// This is the lower bound mediator time, i.e. there are no tablets in the
15+
// given bucket that have unacknowledged transactions at the specified step
16+
// or below.
1317
optional fixed64 TimeBarrier = 4;
14-
repeated fixed64 Exemption = 5;
18+
19+
reserved 5; // was: Exemption
20+
}
21+
22+
// sent from local timecast to mediator for granular per-tablet watches
23+
message TEvGranularWatch {
24+
optional uint32 Bucket = 1;
25+
26+
// non-decreasing subscription id, identifying reconnect attempts
27+
// any subscription or change with a lower id will be dropped
28+
optional uint64 SubscriptionId = 2;
29+
30+
// The list of tablets the client is currently tracking
31+
repeated fixed64 Tablets = 3 [packed = true];
32+
}
33+
34+
// sent from local timecast to mediator to change the list of tablets
35+
message TEvGranularWatchModify {
36+
optional uint32 Bucket = 1;
37+
38+
// An updated subscription id, allowing clients to detect when updates
39+
// reflect the updated tablet list. May match current subscription id,
40+
// any message with a lower subscription id will be ignored.
41+
optional uint64 SubscriptionId = 2;
42+
43+
// The list of tablets to add or remove
44+
repeated fixed64 AddTablets = 3 [packed = true];
45+
repeated fixed64 RemoveTablets = 4 [packed = true];
46+
}
47+
48+
message TEvGranularUpdate {
49+
optional fixed64 Mediator = 1;
50+
optional uint32 Bucket = 2;
51+
optional uint64 SubscriptionId = 3;
52+
53+
// This is the upper bound mediator time, i.e. coordinators have steps
54+
// planned up to the specified step. Most idle tablets move to this time.
55+
optional uint64 LatestStep = 4;
56+
57+
// The list of tablets that have mediator time lower than the latest step
58+
// This list only includes changes within the same subscription
59+
repeated fixed64 FrozenTablets = 5 [packed = true];
60+
61+
// A frozen step for each frozen tablet
62+
repeated uint64 FrozenSteps = 6 [packed = true];
63+
64+
// The list of previously frozen tablets that now match the latest step
65+
repeated fixed64 UnfrozenTablets = 7 [packed = true];
1566
}

ydb/core/tx/mediator/execute_queue.cpp

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,55 @@ namespace NTxMediator {
154154
const NKikimrTxMediatorTimecast::TEvWatch &record = ev->Get()->Record;
155155
// todo: check config coherence
156156
const TActorId &sender = ev->Sender;
157+
const TActorId &server = ev->Recipient;
157158
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_EXEC_QUEUE, "Actor# " << ctx.SelfID.ToString()
158159
<< " MediatorId# " << MediatorId << " HANDLE TEvWatch");
159160

160161
for (ui32 bucketIdx : record.GetBucket()) {
161162
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_EXEC_QUEUE, "Actor# " << ctx.SelfID.ToString()
162163
<< " MediatorId# " << MediatorId << " SEND TEvWatchBucket to# "
163164
<< Buckets[bucketIdx].ActiveActor.ToString() << " bucket.ActiveActor");
164-
ctx.Send(Buckets[bucketIdx].ActiveActor, new TEvTxMediator::TEvWatchBucket(sender));
165+
ctx.Send(Buckets[bucketIdx].ActiveActor, new TEvTxMediator::TEvWatchBucket(sender, server));
166+
}
167+
}
168+
169+
void Handle(TEvMediatorTimecast::TEvGranularWatch::TPtr &ev, const TActorContext &ctx) {
170+
const auto &record = ev->Get()->Record;
171+
const ui32 bucketIdx = record.GetBucket();
172+
173+
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_EXEC_QUEUE, "Actor# " << ctx.SelfID
174+
<< " MediatorId# " << MediatorId << " HANDLE TEvGranularWatch from# " << ev->Sender
175+
<< " bucket# " << bucketIdx);
176+
177+
if (bucketIdx < Buckets.size()) {
178+
ev->Rewrite(ev->GetTypeRewrite(), Buckets[bucketIdx].ActiveActor);
179+
ctx.ExecutorThread.Send(ev.Release());
180+
}
181+
}
182+
183+
void Handle(TEvMediatorTimecast::TEvGranularWatchModify::TPtr &ev, const TActorContext &ctx) {
184+
const auto &record = ev->Get()->Record;
185+
const ui32 bucketIdx = record.GetBucket();
186+
187+
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_EXEC_QUEUE, "Actor# " << ctx.SelfID
188+
<< " MediatorId# " << MediatorId << " HANDLE TEvGranularWatchModify from# " << ev->Sender
189+
<< " bucket# " << bucketIdx);
190+
191+
if (bucketIdx < Buckets.size()) {
192+
ev->Rewrite(ev->GetTypeRewrite(), Buckets[bucketIdx].ActiveActor);
193+
ctx.ExecutorThread.Send(ev.Release());
194+
}
195+
}
196+
197+
void Handle(TEvTxMediator::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx) {
198+
const auto* msg = ev->Get();
199+
200+
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_EXEC_QUEUE, "Actor# " << ctx.SelfID
201+
<< " MediatorId# " << MediatorId << " HANDLE TEvServerDisconnected server# " << msg->ServerId);
202+
203+
// Broadcast to buckets
204+
for (const TBucket &bucket : Buckets) {
205+
ctx.Send(bucket.ActiveActor, new TEvTxMediator::TEvServerDisconnected(msg->ServerId));
165206
}
166207
}
167208

@@ -196,6 +237,9 @@ namespace NTxMediator {
196237
HFunc(TEvTxMediator::TEvCommitStep, Handle);
197238
HFunc(TEvTxMediator::TEvRequestLostAcks, Handle);
198239
HFunc(TEvMediatorTimecast::TEvWatch, Handle);
240+
HFunc(TEvMediatorTimecast::TEvGranularWatch, Handle);
241+
HFunc(TEvMediatorTimecast::TEvGranularWatchModify, Handle);
242+
HFunc(TEvTxMediator::TEvServerDisconnected, Handle);
199243
CFunc(TEvents::TSystem::PoisonPill, Die);
200244
CFunc(TEvents::TSystem::Bootstrap, Bootstrap);
201245
}

ydb/core/tx/mediator/mediator_impl.cpp

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ void TTxMediator::InitSelfState(const TActorContext &ctx) {
5555
}
5656

5757
ReplyEnqueuedSyncs(ctx);
58-
ReplyEnqueuedWatch(ctx);
58+
ProcessEnqueuedWatch(ctx);
5959
}
6060

6161
void TTxMediator::ReplyEnqueuedSyncs(const TActorContext &ctx) {
@@ -65,15 +65,12 @@ void TTxMediator::ReplyEnqueuedSyncs(const TActorContext &ctx) {
6565
CoordinatorsSyncEnqueued.clear();
6666
}
6767

68-
void TTxMediator::ReplyEnqueuedWatch(const TActorContext &ctx) {
69-
for (auto &x: WatchEnqueued) {
70-
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR,
71-
"tablet# " << TabletID() <<
72-
" ReplyEnqueuedWatch" <<
73-
" SEND EvWatch to# " << ExecQueue.ToString() << " ExecQueue");
74-
ctx.Send(x->Forward(ExecQueue));
68+
void TTxMediator::ProcessEnqueuedWatch(const TActorContext &ctx) {
69+
Y_UNUSED(ctx);
70+
for (auto &ev: EnqueuedWatch) {
71+
StateWork(ev);
7572
}
76-
WatchEnqueued.clear();
73+
EnqueuedWatch.clear();
7774
}
7875

7976
void TTxMediator::ReplySync(const TActorId &sender, const NKikimrTx::TEvCoordinatorSync &record, const TActorContext &ctx) {
@@ -102,9 +99,9 @@ void TTxMediator::HandleEnqueue(TEvTxCoordinator::TEvCoordinatorSync::TPtr &ev,
10299
CoordinatorsSyncEnqueued[ev->Sender] = ev->Get()->Record;
103100
}
104101

105-
void TTxMediator::HandleEnqueue(TEvMediatorTimecast::TEvWatch::TPtr &ev, const TActorContext &ctx) {
106-
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR, "tablet# " << TabletID() << " HANDLE Enqueue EvWatch");
107-
WatchEnqueued.push_back(ev);
102+
void TTxMediator::HandleEnqueueWatch(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx) {
103+
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR, "tablet# " << TabletID() << " ENQUEUE Watch from# " << ev->Sender << " server# " << ev->Recipient);
104+
EnqueuedWatch.push_back(std::move(ev));
108105
}
109106

110107
void TTxMediator::DoConfigure(const TEvSubDomain::TEvConfigure &ev, const TActorContext &ctx, const TActorId &ackTo) {
@@ -299,11 +296,34 @@ void TTxMediator::Handle(TEvTxCoordinator::TEvCoordinatorStep::TPtr &ev, const T
299296
return ProcessForeignStep(ev->Sender, record, coordinator, VolatileState.Foreign[coordinator], ctx);
300297
}
301298

302-
void TTxMediator::Handle(TEvMediatorTimecast::TEvWatch::TPtr &ev, const TActorContext &ctx) {
303-
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR, "tablet# " << TabletID() << " HANDLE EvWatch");
304-
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR, "tablet# " << TabletID() << " SEND EvWatch to# "
305-
<< ExecQueue.ToString() << " ExecQueue");
306-
ctx.ExecutorThread.Send(ev->Forward(ExecQueue));
299+
void TTxMediator::HandleForwardWatch(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx) {
300+
if (!ConnectedServers.contains(ev->Recipient)) {
301+
// Server disconnected before this message could be processed
302+
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR, "tablet# " << TabletID()
303+
<< " IGNORE Watch from# " << ev->Sender << " server# " << ev->Recipient);
304+
return;
305+
}
306+
307+
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR, "tablet# " << TabletID()
308+
<< " FORWARD Watch from# " << ev->Sender << " to# " << ExecQueue.ToString() << " ExecQueue");
309+
// Preserve Recipient (server) and InterconnectSession
310+
ev->Rewrite(ev->GetTypeRewrite(), ExecQueue);
311+
ctx.ExecutorThread.Send(ev.Release());
312+
}
313+
314+
void TTxMediator::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx) {
315+
auto* msg = ev->Get();
316+
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR, "tablet# " << TabletID() << " server# " << msg->ServerId << " connected");
317+
ConnectedServers.insert(msg->ServerId);
318+
}
319+
320+
void TTxMediator::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx) {
321+
auto* msg = ev->Get();
322+
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR, "tablet# " << TabletID() << " server# " << msg->ServerId << " disconnnected");
323+
ConnectedServers.erase(msg->ServerId);
324+
if (!!ExecQueue) {
325+
Send(ExecQueue, new TEvTxMediator::TEvServerDisconnected(msg->ServerId));
326+
}
307327
}
308328

309329
TTxMediator::TTxMediator(TTabletStorageInfo *info, const TActorId &tablet)

ydb/core/tx/mediator/mediator_impl.h

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ struct TEvTxMediator {
136136
EvStepPlanComplete,
137137
EvOoOTabletStep,
138138
EvWatchBucket,
139+
EvServerDisconnected,
139140

140141
EvEnd
141142
};
@@ -241,9 +242,11 @@ struct TEvTxMediator {
241242

242243
struct TEvWatchBucket : public TEventLocal<TEvWatchBucket, EvWatchBucket> {
243244
const TActorId Source;
245+
const TActorId ServerId;
244246

245-
TEvWatchBucket(const TActorId &source)
247+
TEvWatchBucket(const TActorId &source, const TActorId &serverId)
246248
: Source(source)
249+
, ServerId(serverId)
247250
{}
248251

249252
TString ToString() const {
@@ -253,6 +256,14 @@ struct TEvTxMediator {
253256
return str.Str();
254257
}
255258
};
259+
260+
struct TEvServerDisconnected : public TEventLocal<TEvServerDisconnected, EvServerDisconnected> {
261+
const TActorId ServerId;
262+
263+
TEvServerDisconnected(const TActorId &serverId)
264+
: ServerId(serverId)
265+
{}
266+
};
256267
};
257268

258269
namespace NTxMediator {
@@ -320,7 +331,9 @@ class TTxMediator : public TActor<TTxMediator>, public NTabletFlatExecutor::TTab
320331
TActorId ExecQueue;
321332

322333
THashMap<TActorId, NKikimrTx::TEvCoordinatorSync> CoordinatorsSyncEnqueued;
323-
TVector<TEvMediatorTimecast::TEvWatch::TPtr> WatchEnqueued;
334+
TVector<TAutoPtr<IEventHandle>> EnqueuedWatch;
335+
336+
THashSet<TActorId> ConnectedServers;
324337

325338
void Die(const TActorContext &ctx) override;
326339
void OnActivateExecutor(const TActorContext &ctx) override;
@@ -330,17 +343,20 @@ class TTxMediator : public TActor<TTxMediator>, public NTabletFlatExecutor::TTab
330343

331344
void Handle(TEvSubDomain::TEvConfigure::TPtr &ev, const TActorContext &ctx);
332345
void HandleEnqueue(TEvTxCoordinator::TEvCoordinatorSync::TPtr &ev, const TActorContext &ctx);
333-
void HandleEnqueue(TEvMediatorTimecast::TEvWatch::TPtr &ev, const TActorContext &ctx);
346+
void HandleEnqueueWatch(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx);
334347
void Handle(TEvTxCoordinator::TEvCoordinatorSync::TPtr &ev, const TActorContext &ctx);
335348
void Handle(TEvTxCoordinator::TEvCoordinatorStep::TPtr &ev, const TActorContext &ctx);
336-
void Handle(TEvMediatorTimecast::TEvWatch::TPtr &ev, const TActorContext &ctx);
349+
void HandleForwardWatch(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx);
350+
351+
void Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx);
352+
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx);
337353

338354
void DoConfigure(const TEvSubDomain::TEvConfigure &ev, const TActorContext &ctx, const TActorId &ackTo = TActorId());
339355

340356
static ui64 SubjectiveTime();
341357
void InitSelfState(const TActorContext &ctx);
342358
void ReplyEnqueuedSyncs(const TActorContext &ctx);
343-
void ReplyEnqueuedWatch(const TActorContext &ctx);
359+
void ProcessEnqueuedWatch(const TActorContext &ctx);
344360

345361

346362
void ReplySync(const TActorId &sender, const NKikimrTx::TEvCoordinatorSync &record, const TActorContext &ctx);
@@ -393,18 +409,22 @@ class TTxMediator : public TActor<TTxMediator>, public NTabletFlatExecutor::TTab
393409
STFUNC_TABLET_DEF(StateSync,
394410
HFunc(TEvTxCoordinator::TEvCoordinatorSync, HandleEnqueue)
395411
HFunc(TEvSubDomain::TEvConfigure, Handle)
396-
HFunc(TEvMediatorTimecast::TEvWatch, HandleEnqueue)
397-
IgnoreFunc(TEvTabletPipe::TEvServerConnected)
398-
IgnoreFunc(TEvTabletPipe::TEvServerDisconnected))
412+
FFunc(TEvMediatorTimecast::TEvWatch::EventType, HandleEnqueueWatch)
413+
FFunc(TEvMediatorTimecast::TEvGranularWatch::EventType, HandleEnqueueWatch)
414+
FFunc(TEvMediatorTimecast::TEvGranularWatchModify::EventType, HandleEnqueueWatch)
415+
HFunc(TEvTabletPipe::TEvServerConnected, Handle)
416+
HFunc(TEvTabletPipe::TEvServerDisconnected, Handle))
399417

400418
STFUNC_TABLET_DEF(StateWork,
401419
HFunc(TEvSubDomain::TEvConfigure, Handle)
402420
HFunc(TEvTxCoordinator::TEvCoordinatorStep, Handle)
403421
HFunc(TEvTxCoordinator::TEvCoordinatorSync, Handle)
404-
HFunc(TEvMediatorTimecast::TEvWatch, Handle)
422+
FFunc(TEvMediatorTimecast::TEvWatch::EventType, HandleForwardWatch)
423+
FFunc(TEvMediatorTimecast::TEvGranularWatch::EventType, HandleForwardWatch)
424+
FFunc(TEvMediatorTimecast::TEvGranularWatchModify::EventType, HandleForwardWatch)
405425
HFunc(NMon::TEvRemoteHttpInfo, RenderHtmlPage)
406-
IgnoreFunc(TEvTabletPipe::TEvServerConnected)
407-
IgnoreFunc(TEvTabletPipe::TEvServerDisconnected))
426+
HFunc(TEvTabletPipe::TEvServerConnected, Handle)
427+
HFunc(TEvTabletPipe::TEvServerDisconnected, Handle))
408428

409429
STFUNC_TABLET_IGN(StateBroken,)
410430
};

0 commit comments

Comments
 (0)