Skip to content

Commit 6b567e3

Browse files
committed
YT-24537: Prioritize writers in TReaderWriterSpinLock, rename old version to TWriterStarvingRWLock
Previously, it was possible that `TReaderWriterSpinLock` wouldn't let the writer through if there's a steady flow of readers. This change addresses that by: 1. Prioritizing writers inside the spinlock by adding an additional `WriterReady` flag that writers set on arrival. This flag doesn't allow any readers to come through. 2. Adding the proper tests to verify this functionality, as well as spinlock's behaviour under forks. 3. Clarifying the documentation about spinlock guarantees 4. Adding a TLA+ model, formally specifying and verifying the guarantees of the new spinlock. 5. Renaming the old lock to `TWriterStarvingRWSpinLock`, and replacing all usages inside YT with the new version (renaming all usages outside of YT to the WriterStarving version). This is a second attempt of REVIEW: 8233768, the first one was rolled back as it lead to deadlocks in user code with reentrant reader locks: the case of `AcquireReader(thread0) -> AcquireWriter(thread1) -> AcquireReader(thread0)` is a deadlock, as `thread0` will not be able to acquire the lock (for the second time) before `thread1` frees writer lock, and `thread1` will not be able to acquire writer lock before the reader lock will be released by `thread0`, which won't happen until `thread0` acquires the lock for the second time. See/for more context and a real example of such situation. Analogous problem can happen with fibers: this is why you shouldn't allow context switches under the lock. Wondering why this ugly name `WriterStarvingRWSpinLock` appeared in your beautiful code? No worries, if you are **sure** that you don't use reentrant locks or fiber switches under the lock, you can freely replace your usage with the new `ReaderWriterSpinLock`. The replacement is drop-in. [nodiff:caesar] commit_hash:97683f854defca00cc283f5a2a10a1730b3c9174
1 parent b2a08a1 commit 6b567e3

10 files changed

+521
-20
lines changed

library/cpp/yt/threading/atomic_object.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include <library/cpp/yt/threading/rw_spin_lock.h>
3+
#include <library/cpp/yt/threading/writer_starving_rw_spin_lock.h>
44

55
#include <concepts>
66

library/cpp/yt/threading/rw_spin_lock-inl.h

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ inline void TReaderWriterSpinLock::AcquireReaderForkFriendly() noexcept
3131
inline void TReaderWriterSpinLock::ReleaseReader() noexcept
3232
{
3333
auto prevValue = Value_.fetch_sub(ReaderDelta, std::memory_order::release);
34-
Y_ASSERT((prevValue & ~WriterMask) != 0);
34+
Y_ASSERT((prevValue & ~(WriterMask | WriterReadyMask)) != 0);
3535
NDetail::RecordSpinLockReleased();
3636
}
3737

@@ -45,14 +45,14 @@ inline void TReaderWriterSpinLock::AcquireWriter() noexcept
4545

4646
inline void TReaderWriterSpinLock::ReleaseWriter() noexcept
4747
{
48-
auto prevValue = Value_.fetch_and(~WriterMask, std::memory_order::release);
48+
auto prevValue = Value_.fetch_and(~(WriterMask | WriterReadyMask), std::memory_order::release);
4949
Y_ASSERT(prevValue & WriterMask);
5050
NDetail::RecordSpinLockReleased();
5151
}
5252

5353
inline bool TReaderWriterSpinLock::IsLocked() const noexcept
5454
{
55-
return Value_.load() != UnlockedValue;
55+
return (Value_.load() & ~WriterReadyMask) != 0;
5656
}
5757

5858
inline bool TReaderWriterSpinLock::IsLockedByReader() const noexcept
@@ -68,7 +68,7 @@ inline bool TReaderWriterSpinLock::IsLockedByWriter() const noexcept
6868
inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept
6969
{
7070
auto oldValue = Value_.fetch_add(ReaderDelta, std::memory_order::acquire);
71-
if ((oldValue & WriterMask) != 0) {
71+
if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
7272
Value_.fetch_sub(ReaderDelta, std::memory_order::relaxed);
7373
return false;
7474
}
@@ -79,7 +79,7 @@ inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept
7979
inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept
8080
{
8181
auto oldValue = Value_.load(std::memory_order::relaxed);
82-
if ((oldValue & WriterMask) != 0) {
82+
if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
8383
return false;
8484
}
8585
return TryAcquireReader();
@@ -88,7 +88,7 @@ inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept
8888
inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept
8989
{
9090
auto oldValue = Value_.load(std::memory_order::relaxed);
91-
if ((oldValue & WriterMask) != 0) {
91+
if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
9292
return false;
9393
}
9494
auto newValue = oldValue + ReaderDelta;
@@ -98,22 +98,35 @@ inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept
9898
return acquired;
9999
}
100100

101-
inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept
101+
inline bool TReaderWriterSpinLock::TryAcquireWriterWithExpectedValue(TValue expected) noexcept
102102
{
103-
auto expected = UnlockedValue;
104-
105-
bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire);
103+
bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire);
106104
NDetail::RecordSpinLockAcquired(acquired);
107105
return acquired;
108106
}
109107

108+
inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept
109+
{
110+
// NB(pavook): we cannot expect writer ready to be set, as this method
111+
// might be called without indicating writer readiness and we cannot
112+
// indicate readiness on the hot path. This means that code calling
113+
// TryAcquireWriter will spin against code calling AcquireWriter.
114+
return TryAcquireWriterWithExpectedValue(UnlockedValue);
115+
}
116+
110117
inline bool TReaderWriterSpinLock::TryAndTryAcquireWriter() noexcept
111118
{
112119
auto oldValue = Value_.load(std::memory_order::relaxed);
113-
if (oldValue != UnlockedValue) {
120+
121+
if ((oldValue & WriterReadyMask) == 0) {
122+
oldValue = Value_.fetch_or(WriterReadyMask, std::memory_order::relaxed);
123+
}
124+
125+
if ((oldValue & (~WriterReadyMask)) != 0) {
114126
return false;
115127
}
116-
return TryAcquireWriter();
128+
129+
return TryAcquireWriterWithExpectedValue(WriterReadyMask);
117130
}
118131

119132
////////////////////////////////////////////////////////////////////////////////

library/cpp/yt/threading/rw_spin_lock.h

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,23 @@ namespace NYT::NThreading {
1616

1717
//! Single-writer multiple-readers spin lock.
1818
/*!
19-
* Reader-side calls are pretty cheap.
20-
* The lock is unfair.
19+
* Reader-side acquires are pretty cheap, and readers don't spin unless writers
20+
* are present.
21+
*
22+
* The lock is unfair, but writers are prioritized over readers, that is,
23+
* if AcquireWriter() is called at some time, then some writer
24+
* (not necessarily the same one that called AcquireWriter) will succeed
25+
* in the next time. This is implemented by an additional flag "WriterReady",
26+
* that writers set on arrival. No readers can proceed until this flag is reset.
27+
*
28+
* WARNING: You probably should not use this lock if forks are possible: see
29+
* fork_aware_rw_spin_lock.h for a proper fork-safe lock which does the housekeeping for you.
30+
*
31+
* WARNING: This lock is not recursive: you can't call AcquireReader() twice in the same
32+
* thread, as that may lead to a deadlock. For the same reason you shouldn't do WaitFor or any
33+
* other context switch under lock.
34+
*
35+
* See tla+/spinlock.tla for the formally verified lock's properties.
2136
*/
2237
class TReaderWriterSpinLock
2338
: public TSpinLockBase
@@ -29,18 +44,26 @@ class TReaderWriterSpinLock
2944
/*!
3045
* Optimized for the case of read-intensive workloads.
3146
* Cheap (just one atomic increment and no spinning if no writers are present).
32-
* Don't use this call if forks are possible: forking at some
47+
*
48+
* WARNING: Don't use this call if forks are possible: forking at some
3349
* intermediate point inside #AcquireReader may corrupt the lock state and
34-
* leave lock forever stuck for the child process.
50+
* leave the lock stuck forever for the child process.
51+
*
52+
* WARNING: The lock is not recursive/reentrant, i.e. it assumes that no thread calls
53+
* AcquireReader() if the reader is already acquired for it.
3554
*/
3655
void AcquireReader() noexcept;
3756
//! Acquires the reader lock.
3857
/*!
3958
* A more expensive version of #AcquireReader (includes at least
4059
* one atomic load and CAS; also may spin even if just readers are present).
60+
*
4161
* In contrast to #AcquireReader, this method can be used in the presence of forks.
42-
* Note that fork-friendliness alone does not provide fork-safety: additional
43-
* actions must be performed to release the lock after a fork.
62+
*
63+
* WARNING: fork-friendliness alone does not provide fork-safety: additional
64+
* actions must be performed to release the lock after a fork. This means you
65+
* probably should NOT use this lock in the presence of forks, consider
66+
* fork_aware_rw_spin_lock.h instead as a proper fork-safe lock.
4467
*/
4568
void AcquireReaderForkFriendly() noexcept;
4669
//! Tries acquiring the reader lock; see #AcquireReader.
@@ -94,10 +117,12 @@ class TReaderWriterSpinLock
94117
using TValue = ui32;
95118
static constexpr TValue UnlockedValue = 0;
96119
static constexpr TValue WriterMask = 1;
97-
static constexpr TValue ReaderDelta = 2;
120+
static constexpr TValue WriterReadyMask = 2;
121+
static constexpr TValue ReaderDelta = 4;
98122

99123
std::atomic<TValue> Value_ = UnlockedValue;
100124

125+
bool TryAcquireWriterWithExpectedValue(TValue expected) noexcept;
101126

102127
bool TryAndTryAcquireReader() noexcept;
103128
bool TryAndTryAcquireWriter() noexcept;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#include <library/cpp/testing/gtest/gtest.h>
2+
3+
#include <library/cpp/yt/threading/rw_spin_lock.h>
4+
5+
#include <util/thread/pool.h>
6+
7+
#include <latch>
8+
#include <thread>
9+
10+
namespace NYT::NThreading {
11+
namespace {
12+
13+
////////////////////////////////////////////////////////////////////////////////
14+
15+
TEST(TReaderWriterSpinLockTest, WriterPriority)
16+
{
17+
int readerThreads = 10;
18+
std::latch latch(readerThreads + 1);
19+
std::atomic<size_t> finishedCount = {0};
20+
21+
TReaderWriterSpinLock lock;
22+
23+
volatile std::atomic<ui32> x = {0};
24+
25+
auto readerTask = [&latch, &lock, &finishedCount, &x] () {
26+
latch.arrive_and_wait();
27+
while (true) {
28+
{
29+
auto guard = ReaderGuard(lock);
30+
// do some stuff
31+
for (ui32 i = 0; i < 10'000u; ++i) {
32+
x.fetch_add(i);
33+
}
34+
}
35+
if (finishedCount.fetch_add(1) > 20'000) {
36+
break;
37+
}
38+
}
39+
};
40+
41+
auto readerPool = CreateThreadPool(readerThreads);
42+
for (int i = 0; i < readerThreads; ++i) {
43+
readerPool->SafeAddFunc(readerTask);
44+
}
45+
46+
latch.arrive_and_wait();
47+
while (finishedCount.load() == 0);
48+
auto guard = WriterGuard(lock);
49+
EXPECT_LE(finishedCount.load(), 1'000u);
50+
DoNotOptimizeAway(x);
51+
}
52+
53+
////////////////////////////////////////////////////////////////////////////////
54+
55+
} // namespace
56+
} // namespace NYT::NConcurrency
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
#include <library/cpp/testing/gtest/gtest.h>
2+
3+
#include <library/cpp/yt/threading/rw_spin_lock.h>
4+
#include <library/cpp/yt/threading/fork_aware_spin_lock.h>
5+
6+
#include <util/thread/pool.h>
7+
8+
#include <sys/wait.h>
9+
10+
namespace NYT::NThreading {
11+
namespace {
12+
13+
////////////////////////////////////////////////////////////////////////////////
14+
15+
TEST(TReaderWriterSpinLockTest, ForkFriendlyness)
16+
{
17+
std::atomic<bool> stopped = {false};
18+
YT_DECLARE_SPIN_LOCK(TReaderWriterSpinLock, lock);
19+
20+
auto readerTask = [&lock, &stopped] () {
21+
while (!stopped.load()) {
22+
ForkFriendlyReaderGuard(lock);
23+
}
24+
};
25+
26+
auto tryReaderTask = [&lock, &stopped] () {
27+
while (!stopped.load()) {
28+
// NB(pavook): TryAcquire instead of Acquire to minimize checks.
29+
bool acquired = lock.TryAcquireReaderForkFriendly();
30+
if (acquired) {
31+
lock.ReleaseReader();
32+
}
33+
}
34+
};
35+
36+
auto tryWriterTask = [&lock, &stopped] () {
37+
while (!stopped.load()) {
38+
Sleep(TDuration::MicroSeconds(1));
39+
bool acquired = lock.TryAcquireWriter();
40+
if (acquired) {
41+
lock.ReleaseWriter();
42+
}
43+
}
44+
};
45+
46+
auto writerTask = [&lock, &stopped] () {
47+
while (!stopped.load()) {
48+
Sleep(TDuration::MicroSeconds(1));
49+
WriterGuard(lock);
50+
}
51+
};
52+
53+
int readerCount = 20;
54+
int writerCount = 10;
55+
56+
auto reader = CreateThreadPool(readerCount);
57+
auto writer = CreateThreadPool(writerCount);
58+
59+
for (int i = 0; i < readerCount / 2; ++i) {
60+
reader->SafeAddFunc(readerTask);
61+
reader->SafeAddFunc(tryReaderTask);
62+
}
63+
for (int i = 0; i < writerCount / 2; ++i) {
64+
writer->SafeAddFunc(writerTask);
65+
writer->SafeAddFunc(tryWriterTask);
66+
}
67+
68+
// And let the chaos begin!
69+
int forkCount = 2000;
70+
for (int iter = 1; iter <= forkCount; ++iter) {
71+
pid_t pid;
72+
{
73+
auto guard = WriterGuard(lock);
74+
pid = fork();
75+
}
76+
77+
YT_VERIFY(pid >= 0);
78+
79+
// NB(pavook): check different orders to maximize chaos.
80+
if (iter % 2 == 0) {
81+
ReaderGuard(lock);
82+
}
83+
WriterGuard(lock);
84+
ReaderGuard(lock);
85+
if (pid == 0) {
86+
// NB(pavook): thread pools are no longer with us.
87+
_exit(0);
88+
}
89+
}
90+
91+
for (int i = 1; i <= forkCount; ++i) {
92+
int status;
93+
YT_VERIFY(waitpid(0, &status, 0) > 0);
94+
YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0);
95+
}
96+
97+
stopped.store(true);
98+
}
99+
100+
////////////////////////////////////////////////////////////////////////////////
101+
102+
TEST(TForkAwareSpinLockTest, ForkSafety)
103+
{
104+
std::atomic<bool> stopped = {false};
105+
YT_DECLARE_SPIN_LOCK(TForkAwareSpinLock, lock);
106+
107+
auto acquireTask = [&lock, &stopped] () {
108+
while (!stopped.load()) {
109+
Guard(lock);
110+
}
111+
};
112+
113+
// NB(pavook): TryAcquire instead of Acquire to minimize checks.
114+
auto tryAcquireTask = [&lock, &stopped] () {
115+
while (!stopped.load()) {
116+
bool acquired = lock.TryAcquire();
117+
if (acquired) {
118+
lock.Release();
119+
}
120+
}
121+
};
122+
123+
int workerCount = 20;
124+
125+
auto worker = CreateThreadPool(workerCount);
126+
127+
for (int i = 0; i < workerCount / 2; ++i) {
128+
worker->SafeAddFunc(acquireTask);
129+
worker->SafeAddFunc(tryAcquireTask);
130+
}
131+
132+
// And let the chaos begin!
133+
int forkCount = 2000;
134+
for (int iter = 1; iter <= forkCount; ++iter) {
135+
pid_t pid = fork();
136+
137+
YT_VERIFY(pid >= 0);
138+
139+
Guard(lock);
140+
Guard(lock);
141+
142+
if (pid == 0) {
143+
// NB(pavook): thread pools are no longer with us.
144+
_exit(0);
145+
}
146+
}
147+
148+
for (int i = 1; i <= forkCount; ++i) {
149+
int status;
150+
YT_VERIFY(waitpid(0, &status, 0) > 0);
151+
YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0);
152+
}
153+
154+
stopped.store(true);
155+
}
156+
157+
////////////////////////////////////////////////////////////////////////////////
158+
159+
} // namespace
160+
} // namespace NYT::NConcurrency

0 commit comments

Comments
 (0)