Skip to content

Commit 17e4756

Browse files
author
cherepashka
committed
YT-24270: Added overdraft for TAsyncSemaphore
* Changelog entry Type: feature Component: core Add slots overdraft into TAsyncSemaphore commit_hash:3a25c8c48546671fece656d75b2b5e4d37f829cd
1 parent 6b8cf18 commit 17e4756

File tree

3 files changed

+108
-1
lines changed

3 files changed

+108
-1
lines changed

yt/yt/core/concurrency/async_semaphore.cpp

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@ TAsyncSemaphore::TAsyncSemaphore(i64 totalSlots)
1111
YT_VERIFY(TotalSlots_ >= 0);
1212
}
1313

14+
TAsyncSemaphore::TAsyncSemaphore(i64 totalSlots, bool enableOverdraft)
15+
: TotalSlots_(totalSlots)
16+
, FreeSlots_(totalSlots)
17+
, EnableOverdraft_(enableOverdraft)
18+
{
19+
YT_VERIFY(TotalSlots_ >= 0);
20+
}
21+
1422
void TAsyncSemaphore::SetTotal(i64 totalSlots)
1523
{
1624
YT_VERIFY(totalSlots >= 0);
@@ -33,6 +41,10 @@ void TAsyncSemaphore::Release(i64 slots)
3341
auto guard = WriterGuard(SpinLock_);
3442

3543
FreeSlots_ += slots;
44+
if (EnableOverdraft_) {
45+
FreeSlots_ = std::min(FreeSlots_, TotalSlots_);
46+
}
47+
3648
YT_VERIFY(FreeSlots_ <= TotalSlots_);
3749

3850
if (Releasing_) {
@@ -48,9 +60,17 @@ void TAsyncSemaphore::Release(i64 slots)
4860

4961
{
5062
auto guard = WriterGuard(SpinLock_);
63+
auto frontWaiterOverflowsSlots = [&] {
64+
return EnableOverdraft_ && !Waiters_.empty() && Waiters_.front().Slots > TotalSlots_ && FreeSlots_ == TotalSlots_;
65+
};
5166

52-
while (!Waiters_.empty() && FreeSlots_ >= Waiters_.front().Slots) {
67+
while (!Waiters_.empty() && FreeSlots_ >= Waiters_.front().Slots || frontWaiterOverflowsSlots()) {
5368
auto& waiter = Waiters_.front();
69+
if (frontWaiterOverflowsSlots()) {
70+
// For "fat" request we need to acquire all total slots in semaphore.
71+
YT_ASSERT(FreeSlots_ == TotalSlots_);
72+
waiter.Slots = FreeSlots_;
73+
}
5474
FreeSlots_ -= waiter.Slots;
5575
waitersToRelease.push_back(std::move(waiter));
5676
Waiters_.pop();
@@ -82,6 +102,10 @@ bool TAsyncSemaphore::Acquire(i64 slots)
82102
YT_VERIFY(slots >= 0);
83103

84104
auto guard = WriterGuard(SpinLock_);
105+
if (EnableOverdraft_) {
106+
slots = std::min(slots, TotalSlots_);
107+
}
108+
85109
FreeSlots_ -= slots;
86110

87111
return FreeSlots_ >= 0;
@@ -92,6 +116,10 @@ bool TAsyncSemaphore::TryAcquire(i64 slots)
92116
YT_VERIFY(slots >= 0);
93117

94118
auto guard = WriterGuard(SpinLock_);
119+
if (EnableOverdraft_) {
120+
slots = std::min(slots, TotalSlots_);
121+
}
122+
95123
if (FreeSlots_ < slots) {
96124
return false;
97125
}
@@ -104,6 +132,10 @@ TFuture<TAsyncSemaphoreGuard> TAsyncSemaphore::AsyncAcquire(i64 slots)
104132
YT_VERIFY(slots >= 0);
105133

106134
auto guard = WriterGuard(SpinLock_);
135+
if (EnableOverdraft_) {
136+
slots = std::min(slots, TotalSlots_);
137+
}
138+
107139
if (FreeSlots_ >= slots) {
108140
FreeSlots_ -= slots;
109141
return MakeFuture(TAsyncSemaphoreGuard(this, slots));

yt/yt/core/concurrency/async_semaphore.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class TAsyncSemaphore
5454
{
5555
public:
5656
explicit TAsyncSemaphore(i64 totalSlots);
57+
TAsyncSemaphore(i64 totalSlots, bool enableOverdraft);
5758

5859
//! Updates the total number of slots.
5960
void SetTotal(i64 totalSlots);
@@ -95,6 +96,7 @@ class TAsyncSemaphore
9596
i64 TotalSlots_;
9697
i64 FreeSlots_;
9798

99+
bool EnableOverdraft_ = false;
98100
bool Releasing_ = false;
99101

100102
TPromise<void> ReadyEvent_;

yt/yt/core/concurrency/unittests/async_semaphore_ut.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <yt/yt/core/concurrency/async_semaphore.h>
66
#include <yt/yt/core/concurrency/scheduler_api.h>
7+
#include <yt/yt/core/concurrency/thread_pool.h>
78

89
namespace NYT {
910
namespace {
@@ -26,6 +27,78 @@ TEST(TAsyncSemaphoreTest, CancelReadyEvent)
2627
EXPECT_TRUE(WaitFor(readyTwo).IsOK());
2728
}
2829

30+
TEST(TAsyncSemaphoreTest, OverdraftSlots)
31+
{
32+
constexpr static int ThreadCount = 4;
33+
constexpr static int RequestCount = 10;
34+
constexpr static int SemaphoreTotalSlots = 1;
35+
constexpr static int RequestWeight = 100;
36+
37+
auto threadPool = CreateThreadPool(ThreadCount, "SemaphoreAcqusition");
38+
auto semaphore = New<TAsyncSemaphore>(SemaphoreTotalSlots, /*enableOverdraft*/ true);
39+
40+
{
41+
std::vector<TFuture<void>> futures;
42+
auto barrierPromise = NewPromise<void>();
43+
for (int i = 0; i < RequestCount; ++i) {
44+
futures.push_back(BIND([
45+
&semaphore,
46+
barrierFuture = barrierPromise.ToFuture()
47+
] {
48+
WaitForFast(barrierFuture)
49+
.ThrowOnError();
50+
51+
WaitFor(semaphore->AsyncAcquire(RequestWeight).AsVoid())
52+
.ThrowOnError();
53+
})
54+
.AsyncVia(threadPool->GetInvoker())
55+
.Run());
56+
}
57+
58+
barrierPromise.Set();
59+
60+
WaitFor(AllSucceeded(std::move(futures)))
61+
.ThrowOnError();
62+
}
63+
64+
{
65+
semaphore->SetTotal(RequestWeight);
66+
std::vector<TFuture<void>> futures;
67+
auto barrierPromise = NewPromise<void>();
68+
for (int i = 0; i < RequestCount; ++i) {
69+
futures.push_back(BIND([
70+
&semaphore,
71+
barrierFuture = barrierPromise.ToFuture()
72+
] {
73+
WaitForFast(barrierFuture)
74+
.ThrowOnError();
75+
76+
WaitFor(semaphore->AsyncAcquire(RequestWeight).AsVoid())
77+
.ThrowOnError();
78+
})
79+
.AsyncVia(threadPool->GetInvoker())
80+
.Run());
81+
}
82+
83+
futures.push_back(BIND([
84+
&semaphore,
85+
barrierFuture = barrierPromise.ToFuture()
86+
] {
87+
WaitForFast(barrierFuture)
88+
.ThrowOnError();
89+
90+
semaphore->SetTotal(SemaphoreTotalSlots);
91+
})
92+
.AsyncVia(threadPool->GetInvoker())
93+
.Run());
94+
95+
barrierPromise.Set();
96+
97+
WaitFor(AllSucceeded(std::move(futures)))
98+
.ThrowOnError();
99+
}
100+
}
101+
29102
////////////////////////////////////////////////////////////////////////////////
30103

31104
} // namespace

0 commit comments

Comments
 (0)