Skip to content

Commit 7818abc

Browse files
authored
Lock-free mailboxes with intrusive event lists (#11419)
1 parent 1727b44 commit 7818abc

39 files changed

+1939
-1995
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -776,8 +776,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
776776
}
777777

778778
LOG_T("Sending channels info to compute actor: " << computeActorId << ", channels: " << channelIds.size());
779-
bool sent = this->Send(computeActorId, channelsInfoEv.Release());
780-
YQL_ENSURE(sent, "Failed to send event to " << computeActorId.ToString());
779+
this->Send(computeActorId, channelsInfoEv.Release());
781780
}
782781
}
783782

ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,9 @@ class TSnapshotManagerActor: public TActorBootstrapped<TSnapshotManagerActor> {
117117

118118
LOG_D("KqpSnapshotManager: snapshot: " << Snapshot << " acquired");
119119

120-
bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
120+
Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
121121
Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit)),
122122
0, Cookie);
123-
Y_DEBUG_ABORT_UNLESS(sent);
124123

125124
PassAway();
126125
} else {
@@ -172,10 +171,9 @@ class TSnapshotManagerActor: public TActorBootstrapped<TSnapshotManagerActor> {
172171

173172
LOG_D("KqpSnapshotManager: snapshot " << Snapshot.Step << ":" << Snapshot.TxId << " created");
174173

175-
bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
174+
Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
176175
Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit)),
177176
0, Cookie);
178-
Y_DEBUG_ABORT_UNLESS(sent);
179177

180178
Become(&TThis::StateRefreshing);
181179
ScheduleRefresh();

ydb/core/testlib/actor_helpers.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace NKikimr {
55
TActorSystemStub::TActorSystemStub() {
66
THolder<NActors::TActorSystemSetup> setup(new NActors::TActorSystemSetup);
77
System.Reset(new NActors::TActorSystem(setup));
8-
Mailbox.Reset(new NActors::TMailboxHeader(NActors::TMailboxType::Simple));
8+
Mailbox.Reset(new NActors::TMailbox());
99
ExecutorThread.Reset(new NActors::TExecutorThread(0, System.Get(), nullptr, nullptr, "thread"));
1010
Ctx.Reset(new NActors::TActorContext(*Mailbox, *ExecutorThread, GetCycleCountFast(), SelfID));
1111
PrevCtx = NActors::TlsActivationContext;

ydb/core/testlib/actor_helpers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace NKikimr {
88

99
struct TActorSystemStub {
1010
THolder<NActors::TActorSystem> System;
11-
THolder<NActors::TMailboxHeader> Mailbox;
11+
THolder<NActors::TMailbox> Mailbox;
1212
THolder<NActors::TExecutorThread> ExecutorThread;
1313
NActors::TActorId SelfID;
1414
THolder<NActors::TActorContext> Ctx;

ydb/core/util/actorsys_test/testactorsys.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,13 @@ class TTestExecutorPool : public IExecutorPool {
6464
, NodeId(nodeId)
6565
{}
6666

67-
ui32 GetReadyActivation(TWorkerContext& /*wctx*/, ui64 /*revolvingCounter*/) override {
67+
TMailbox* GetReadyActivation(TWorkerContext& /*wctx*/, ui64 /*revolvingCounter*/) override {
6868
Y_ABORT();
6969
}
7070

71-
void ReclaimMailbox(TMailboxType::EType /*mailboxType*/, ui32 /*hint*/, NActors::TWorkerId /*workerId*/, ui64 /*revolvingCounter*/) override {
72-
Y_ABORT();
73-
}
74-
75-
TMailboxHeader *ResolveMailbox(ui32 hint) override {
71+
TMailbox* ResolveMailbox(ui32 hint) override {
7672
const auto it = Context->Mailboxes.find({NodeId, PoolId, hint});
77-
return it != Context->Mailboxes.end() ? &it->second.Header : nullptr;
73+
return it != Context->Mailboxes.end() ? &it->second : nullptr;
7874
}
7975

8076
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, NActors::TWorkerId /*workerId*/) override {
@@ -105,24 +101,28 @@ class TTestExecutorPool : public IExecutorPool {
105101
return Send(ev);
106102
}
107103

108-
void ScheduleActivation(ui32 /*activation*/) override {
104+
void ScheduleActivation(TMailbox* /*mailbox*/) override {
109105
Y_ABORT();
110106
}
111107

112-
void SpecificScheduleActivation(ui32 /*activation*/) override {
108+
void SpecificScheduleActivation(TMailbox* /*mailbox*/) override {
113109
Y_ABORT();
114110
}
115111

116-
void ScheduleActivationEx(ui32 /*activation*/, ui64 /*revolvingCounter*/) override {
112+
void ScheduleActivationEx(TMailbox* /*mailbox*/, ui64 /*revolvingCounter*/) override {
117113
Y_ABORT();
118114
}
119115

120116
TActorId Register(IActor* actor, TMailboxType::EType /*mailboxType*/, ui64 /*revolvingCounter*/, const TActorId& parentId) override {
121117
return Context->Register(actor, parentId, PoolId, std::nullopt, NodeId);
122118
}
123119

124-
TActorId Register(IActor* actor, TMailboxHeader* /*mailbox*/, ui32 hint, const TActorId& parentId) override {
125-
return Context->Register(actor, parentId, PoolId, hint, NodeId);
120+
TActorId Register(IActor* actor, TMailboxCache& /*cache*/, ui64 /*revolvingCounter*/, const TActorId& parentId) override {
121+
return Context->Register(actor, parentId, PoolId, std::nullopt, NodeId);
122+
}
123+
124+
TActorId Register(IActor* actor, TMailbox* mailbox, const TActorId& parentId) override {
125+
return Context->Register(actor, parentId, PoolId, mailbox->Hint, NodeId);
126126
}
127127

128128
void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {

ydb/core/util/actorsys_test/testactorsys.h

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ class TTestActorSystem {
105105
ui32 NextHint = 1;
106106
};
107107

108-
struct TMailboxInfo {
109-
TMailboxHeader Header{TMailboxType::Simple};
108+
struct TMailboxInfo : public TMailbox {
110109
};
111110

112111
struct TAppDataInfo {
@@ -311,8 +310,8 @@ class TTestActorSystem {
311310
for (auto it = Mailboxes.begin(); it != Mailboxes.end(); ) {
312311
if (from <= it->first && it->first < to) {
313312
TMailboxInfo& mbox = it->second;
314-
mbox.Header.ForEach([&](ui64 /*actorId*/, IActor *actor) { ActorName.erase(actor); });
315-
mbox.Header.CleanupActors();
313+
mbox.ForEach([&](ui64 /*actorId*/, IActor *actor) { ActorName.erase(actor); });
314+
mbox.CleanupActors();
316315
it = Mailboxes.erase(it);
317316
found = true;
318317
} else {
@@ -442,13 +441,13 @@ class TTestActorSystem {
442441
}
443442
}
444443

445-
IActor *GetActor(const TActorId& actorId, TMailboxHeader **header = nullptr) {
444+
IActor *GetActor(const TActorId& actorId, TMailbox **mailbox = nullptr) {
446445
if (const auto it = Mailboxes.find(actorId); it != Mailboxes.end()) {
447446
TMailboxInfo& mbox = it->second;
448-
if (header) {
449-
*header = &mbox.Header;
447+
if (mailbox) {
448+
*mailbox = &mbox;
450449
}
451-
return mbox.Header.FindActor(actorId.LocalId());
450+
return mbox.FindActor(actorId.LocalId());
452451
} else {
453452
return nullptr;
454453
}
@@ -495,7 +494,8 @@ class TTestActorSystem {
495494
// register actor in mailbox
496495
const auto& it = Mailboxes.try_emplace(TMailboxId(nodeId, poolId, mboxId)).first;
497496
TMailboxInfo& mbox = it->second;
498-
mbox.Header.AttachActor(ActorLocalId, actor);
497+
mbox.Hint = mboxId;
498+
mbox.AttachActor(ActorLocalId, actor);
499499

500500
// generate actor id
501501
const TActorId actorId(nodeId, poolId, ActorLocalId, mboxId);
@@ -595,15 +595,15 @@ class TTestActorSystem {
595595
return false;
596596
}
597597
TMailboxInfo& mbox = mboxIt->second;
598-
if (IActor *actor = mbox.Header.FindActor(actorId.LocalId())) {
598+
if (IActor *actor = mbox.FindActor(actorId.LocalId())) {
599599
// obtain node info for this actor
600600
TPerNodeInfo *info = GetNode(actorId.NodeId());
601601

602602
// adjust clock for correct operation
603603
info->SchedulerThread->AdjustClock(Clock);
604604

605605
// allocate context and store its reference in TLS
606-
TActorContext ctx(mbox.Header, *info->ExecutorThread, GetCycleCountFast(), actorId);
606+
TActorContext ctx(mbox, *info->ExecutorThread, GetCycleCountFast(), actorId);
607607
TlsActivationContext = &ctx;
608608
CurrentRecipient = actorId;
609609
CurrentNodeId = actorId.NodeId();
@@ -641,7 +641,7 @@ class TTestActorSystem {
641641
info->ExecutorThread->DropUnregistered();
642642

643643
// drop the mailbox if no actors remain there
644-
if (mbox.Header.IsEmpty()) {
644+
if (mbox.IsEmpty()) {
645645
Mailboxes.erase(mboxIt);
646646
}
647647
return true;
@@ -690,19 +690,19 @@ class TTestActorSystem {
690690
TMailboxInfo& mbox = it->second;
691691

692692
// update stats
693-
const auto nameIt = ActorName.find(mbox.Header.FindActor(actorId.LocalId()));
693+
const auto nameIt = ActorName.find(mbox.FindActor(actorId.LocalId()));
694694
Y_ABORT_UNLESS(nameIt != ActorName.end());
695695
++ActorStats[nameIt->second].Destroyed;
696696
ActorName.erase(nameIt);
697697

698698
// unregister actor through the executor
699-
info->ExecutorThread->UnregisterActor(&mbox.Header, actorId);
699+
info->ExecutorThread->UnregisterActor(&mbox, actorId);
700700

701701
// terminate unregistered actor
702702
info->ExecutorThread->DropUnregistered();
703703

704704
// delete mailbox if empty
705-
if (mbox.Header.IsEmpty()) {
705+
if (mbox.IsEmpty()) {
706706
Mailboxes.erase(actorId);
707707
}
708708
}

ydb/library/actors/core/actor.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,15 @@ namespace NActors {
134134
TActorId TActivationContext::RegisterWithSameMailbox(IActor* actor, TActorId parentId) {
135135
Y_DEBUG_ABORT_UNLESS(parentId);
136136
auto& ctx = *TlsActivationContext;
137-
return ctx.ExecutorThread.RegisterActor(actor, &ctx.Mailbox, parentId.Hint(), parentId);
137+
return ctx.ExecutorThread.RegisterActor(actor, &ctx.Mailbox, parentId);
138138
}
139139

140140
TActorId TActorContext::RegisterWithSameMailbox(IActor* actor) const {
141-
return ExecutorThread.RegisterActor(actor, &Mailbox, SelfID.Hint(), SelfID);
141+
return ExecutorThread.RegisterActor(actor, &Mailbox, SelfID);
142142
}
143143

144144
TActorId IActor::RegisterWithSameMailbox(IActor* actor) const noexcept {
145-
return TlsActivationContext->ExecutorThread.RegisterActor(actor, &TlsActivationContext->Mailbox, SelfActorId.Hint(), SelfActorId);
145+
return TlsActivationContext->ExecutorThread.RegisterActor(actor, &TlsActivationContext->Mailbox, SelfActorId);
146146
}
147147

148148
TActorId TActivationContext::InterconnectProxy(ui32 destinationNodeId) {
@@ -320,22 +320,22 @@ namespace NActors {
320320
}
321321
}
322322

323-
template TActorId TGenericExecutorThread::RegisterActor<ESendingType::Common>(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId);
324-
template TActorId TGenericExecutorThread::RegisterActor<ESendingType::Lazy>(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId);
325-
template TActorId TGenericExecutorThread::RegisterActor<ESendingType::Tail>(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId);
323+
template TActorId TGenericExecutorThread::RegisterActor<ESendingType::Common>(IActor* actor, TMailbox* mailbox, TActorId parentId);
324+
template TActorId TGenericExecutorThread::RegisterActor<ESendingType::Lazy>(IActor* actor, TMailbox* mailbox, TActorId parentId);
325+
template TActorId TGenericExecutorThread::RegisterActor<ESendingType::Tail>(IActor* actor, TMailbox* mailbox, TActorId parentId);
326326

327327
template <ESendingType SendingType>
328-
TActorId TGenericExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) {
328+
TActorId TGenericExecutorThread::RegisterActor(IActor* actor, TMailbox* mailbox, TActorId parentId) {
329329
if (!parentId) {
330330
parentId = CurrentRecipient;
331331
}
332332
if constexpr (SendingType == ESendingType::Common) {
333-
return Ctx.Executor->Register(actor, mailbox, hint, parentId);
333+
return Ctx.Executor->Register(actor, mailbox, parentId);
334334
} else if (!TlsActivationContext) {
335-
return Ctx.Executor->Register(actor, mailbox, hint, parentId);
335+
return Ctx.Executor->Register(actor, mailbox, parentId);
336336
} else {
337337
ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
338-
TActorId id = Ctx.Executor->Register(actor, mailbox, hint, parentId);
338+
TActorId id = Ctx.Executor->Register(actor, mailbox, parentId);
339339
TlsThreadContext->SendingType = previousType;
340340
return id;
341341
}

ydb/library/actors/core/actor.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "monotonic.h"
66

77
#include <ydb/library/actors/actor_type/indexes.h>
8+
#include <ydb/library/actors/util/datetime.h>
89
#include <ydb/library/actors/util/local_process_key.h>
910

1011
#include <util/system/tls.h>
@@ -13,7 +14,7 @@
1314
namespace NActors {
1415
class TActorSystem;
1516
class TMailboxTable;
16-
struct TMailboxHeader;
17+
class TMailbox;
1718

1819
class TGenericExecutorThread;
1920
class IActor;
@@ -42,12 +43,12 @@ namespace NActors {
4243

4344
struct TActivationContext {
4445
public:
45-
TMailboxHeader& Mailbox;
46+
TMailbox& Mailbox;
4647
TGenericExecutorThread& ExecutorThread;
4748
const NHPTimer::STime EventStart;
4849

4950
protected:
50-
explicit TActivationContext(TMailboxHeader& mailbox, TGenericExecutorThread& executorThread, NHPTimer::STime eventStart)
51+
explicit TActivationContext(TMailbox& mailbox, TGenericExecutorThread& executorThread, NHPTimer::STime eventStart)
5152
: Mailbox(mailbox)
5253
, ExecutorThread(executorThread)
5354
, EventStart(eventStart)
@@ -135,7 +136,7 @@ namespace NActors {
135136
struct TActorContext: public TActivationContext {
136137
const TActorId SelfID;
137138
using TEventFlags = IEventHandle::TEventFlags;
138-
explicit TActorContext(TMailboxHeader& mailbox, TGenericExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
139+
explicit TActorContext(TMailbox& mailbox, TGenericExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
139140
: TActivationContext(mailbox, executorThread, eventStart)
140141
, SelfID(selfID)
141142
{

ydb/library/actors/core/defs.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ namespace NActors {
4747
HTSwap = 2, // other simple lf queue, suggested for low-contention case
4848
ReadAsFilled = 3, // wait-free queue, suggested for high-contention or latency critical
4949
TinyReadAsFilled = 4, // same as 3 but with lower overhead
50+
LockFreeIntrusive = 5, // lock-free intrusive mailbox
5051
//Inplace;
5152
//Direct;
5253
//Virtual

ydb/library/actors/core/event.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ namespace NActors {
6565
public:
6666
typedef TAutoPtr<IEventHandle> TPtr;
6767

68+
public:
69+
// Used by a mailbox intrusive list
70+
std::atomic<uintptr_t> NextLinkPtr;
71+
6872
public:
6973
template <typename TEv>
7074
inline TEv* CastAsLocal() const noexcept {

0 commit comments

Comments
 (0)