Skip to content

Commit 31a14ef

Browse files
authored
Add requested preemption event (#12814)
1 parent a580b00 commit 31a14ef

File tree

4 files changed

+65
-6
lines changed

4 files changed

+65
-6
lines changed

ydb/library/actors/core/events.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ namespace NActors {
106106
CoroTimeout,
107107
InvokeQuery,
108108
Wilson,
109+
Preemption,
109110
End,
110111

111112
// Compatibility section
@@ -128,6 +129,14 @@ namespace NActors {
128129
const ui64 Tag = 0;
129130
};
130131

132+
struct TEvPreemption: public TEventLocal<TEvPreemption, TSystem::Preemption> {
133+
bool ByEventCount = false;
134+
bool ByCycles = false;
135+
bool ByTailSend = false;
136+
ui32 EventCount = 0;
137+
ui64 Cycles = 0;
138+
};
139+
131140
struct TEvSubscribe: public TEventLocal<TEvSubscribe, TSystem::Subscribe> {
132141
};
133142

ydb/library/actors/core/executor_thread.cpp

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,26 @@ namespace NActors {
181181
SafeTypeName(actorType));
182182
}
183183

184+
ui32 TGenericExecutorThread::GetOverwrittenEventsPerMailbox() const {
185+
return Ctx.OverwrittenEventsPerMailbox;
186+
}
187+
188+
void TGenericExecutorThread::SetOverwrittenEventsPerMailbox(ui32 value) {
189+
Ctx.OverwrittenEventsPerMailbox = Max(value, Ctx.EventsPerMailbox);
190+
}
191+
192+
ui64 TGenericExecutorThread::GetOverwrittenTimePerMailboxTs() const {
193+
return Ctx.OverwrittenTimePerMailboxTs;
194+
}
195+
196+
void TGenericExecutorThread::SetOverwrittenTimePerMailboxTs(ui64 value) {
197+
Ctx.OverwrittenTimePerMailboxTs = Max(value, Ctx.TimePerMailboxTs);
198+
}
199+
200+
void TGenericExecutorThread::SubscribeToPreemption(TActorId actorId) {
201+
Ctx.PreemptionSubscribed.push_back(actorId);
202+
}
203+
184204
TGenericExecutorThread::TProcessingResult TGenericExecutorThread::Execute(TMailbox* mailbox, bool isTailExecution) {
185205
Y_DEBUG_ABORT_UNLESS(DyingActors.empty());
186206

@@ -194,16 +214,20 @@ namespace NActors {
194214
ui32 prevActivityType = std::numeric_limits<ui32>::max();
195215
TActorId recipient;
196216
bool firstEvent = true;
197-
bool preempted = false;
217+
bool preemptedByEventCount = false;
218+
bool preemptedByCycles = false;
219+
bool preemptedByTailSend = false;
198220
bool wasWorking = false;
199221
NHPTimer::STime hpnow = Ctx.HPStart;
200222
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
201223
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
202224
NHPTimer::STime eventStart = Ctx.HPStart;
203225
TlsThreadContext->ActivationStartTS.store(Ctx.HPStart, std::memory_order_release);
204226

227+
Ctx.OverwrittenEventsPerMailbox = Ctx.EventsPerMailbox;
228+
Ctx.OverwrittenTimePerMailboxTs = Ctx.TimePerMailboxTs;
205229
bool drained = false;
206-
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
230+
for (; Ctx.ExecutedEvents < Ctx.OverwrittenEventsPerMailbox; ++Ctx.ExecutedEvents) {
207231
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
208232
recipient = evExt->GetRecipientRewrite();
209233
actor = mailbox->FindActor(recipient.LocalId());
@@ -311,6 +335,7 @@ namespace NActors {
311335
Ctx.WorkerId,
312336
recipient.ToString(),
313337
SafeTypeName(actorType));
338+
preemptedByTailSend = true;
314339
break;
315340
}
316341

@@ -326,7 +351,7 @@ namespace NActors {
326351
Ctx.WorkerId,
327352
recipient.ToString(),
328353
SafeTypeName(actorType));
329-
preempted = true;
354+
preemptedByCycles = true;
330355
break;
331356
}
332357

@@ -342,7 +367,7 @@ namespace NActors {
342367
Ctx.WorkerId,
343368
recipient.ToString(),
344369
SafeTypeName(actorType));
345-
preempted = true;
370+
preemptedByCycles = true;
346371
break;
347372
}
348373

@@ -357,7 +382,7 @@ namespace NActors {
357382
Ctx.WorkerId,
358383
recipient.ToString(),
359384
SafeTypeName(actorType));
360-
preempted = true;
385+
preemptedByEventCount = true;
361386
break;
362387
}
363388
} else {
@@ -376,6 +401,22 @@ namespace NActors {
376401
break; // empty queue, leave
377402
}
378403
}
404+
if (Ctx.PreemptionSubscribed.size()) {
405+
std::unique_ptr<TEvents::TEvPreemption> event = std::make_unique<TEvents::TEvPreemption>();
406+
event->ByEventCount = preemptedByEventCount;
407+
event->ByCycles = preemptedByCycles;
408+
event->ByTailSend = preemptedByTailSend;
409+
event->EventCount = Ctx.ExecutedEvents;
410+
event->Cycles = hpnow - Ctx.HPStart;
411+
TAutoPtr<IEventHandle> ev = new IEventHandle(TActorId(), TActorId(), event.release());
412+
for (const auto& actorId : Ctx.PreemptionSubscribed) {
413+
IActor *actor = mailbox->FindActor(actorId.LocalId());
414+
if (actor) {
415+
actor->Receive(ev);
416+
}
417+
}
418+
Ctx.PreemptionSubscribed.clear();
419+
}
379420
TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release);
380421
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
381422

@@ -387,7 +428,7 @@ namespace NActors {
387428
} else {
388429
mailbox->Unlock(Ctx.Executor, hpnow, RevolvingWriteCounter);
389430
}
390-
return {preempted, wasWorking};
431+
return {preemptedByEventCount || preemptedByCycles, wasWorking};
391432
}
392433

393434
TThreadId TGenericExecutorThread::GetThreadId() const {

ydb/library/actors/core/executor_thread.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ namespace NActors {
7878
TThreadId GetThreadId() const; // blocks, must be called after Start()
7979
TWorkerId GetWorkerId() const;
8080

81+
void SubscribeToPreemption(TActorId actorId);
82+
ui32 GetOverwrittenEventsPerMailbox() const;
83+
void SetOverwrittenEventsPerMailbox(ui32 value);
84+
ui64 GetOverwrittenTimePerMailboxTs() const;
85+
void SetOverwrittenTimePerMailboxTs(ui64 value);
86+
8187
protected:
8288
TProcessingResult ProcessExecutorPool(IExecutorPool *pool);
8389

ydb/library/actors/core/worker_context.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ namespace NActors {
4444
bool IsNeededToWaitNextActivation = true;
4545
i64 HPStart = 0;
4646
ui32 ExecutedEvents = 0;
47+
ui32 OverwrittenEventsPerMailbox = 0;
48+
ui64 OverwrittenTimePerMailboxTs = 0;
4749
TSharedExecutorThreadCtx *SharedThread = nullptr;
4850
TCpuSensor CpuSensor;
51+
TStackVec<TActorId, 1> PreemptionSubscribed;
4952

5053
TMailboxCache MailboxCache;
5154

0 commit comments

Comments
 (0)