Skip to content

Commit 7f431aa

Browse files
committed
Clarify why we need to check Thread#interrupted after Condition#await
* Use the new helpers for all Condition#await.
1 parent eb340d7 commit 7f431aa

File tree

5 files changed

+58
-32
lines changed

5 files changed

+58
-32
lines changed

src/main/.checkstyle_checks.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@
149149
<property name="format" value="\.computeIfAbsent"/>
150150
<property name="message" value="Use ConcurrentOperations.getOrCompute() instead of ConcurrentHashMap.computeIfAbsent() which does not scale"/>
151151
</module>
152+
<module name="RegexpSinglelineJava">
153+
<property name="format" value="\b(?!\w*[Ll]atch)\w+\.await\(.*\)"/>
154+
<property name="message" value="Use ConcurrentOperations.awaitAndCheckInterrupt() instead"/>
155+
</module>
152156
<module name="RegexpSinglelineJava">
153157
<property name="format" value="@NodeChildren"/>
154158
<property name="message" value="Do not use @NodeChildren, use multiple @Child annotations directly"/>

src/main/java/org/truffleruby/collections/ConcurrentOperations.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313

1414
import java.util.Map;
1515
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
18+
import java.util.concurrent.locks.Condition;
19+
import java.util.concurrent.locks.ReentrantLock;
1620
import java.util.function.Function;
1721

1822
public abstract class ConcurrentOperations {
@@ -43,4 +47,40 @@ public static <K, V> boolean replace(Map<K, V> map, K key, V oldValue, V newValu
4347
return oldValue == null ? map.putIfAbsent(key, newValue) == null : map.replace(key, oldValue, newValue);
4448
}
4549

50+
/** {@link Condition#await()} might return and not throw InterruptedException if there was both a signal and an
51+
* interrupt before the lock could be re-acquired by await(). We always want the InterruptedException if both a
52+
* signal and an interrupt happen.
53+
* <hr>
54+
* {@link Condition#await()} when it sees both a signal and a {@link Thread#interrupt()} has to decide whether it
55+
* prefers the signal or the interrupt. For a {@link ReentrantLock#newCondition()} condition, the implementation in
56+
* {@link AbstractQueuedSynchronizer.ConditionObject#await()} sometimes throws InterruptedException and sometimes
57+
* returns, depending on which happens first. Because await() needs to reacquire the lock, even if the
58+
* InterruptedException happens before it might still become a situation of "both signal and interrupt". If it
59+
* prefers the signal and just returns, it does {@code Thread.currentThread().interrupt()} to let us know there was
60+
* an interrupt too. In any case, if there was any interrupt we want to throw InterruptedException, regardless of
61+
* what the implementation prefers. */
62+
public static void awaitAndCheckInterrupt(Condition condition) throws InterruptedException {
63+
// Checkstyle: stop
64+
condition.await();
65+
// Checkstyle: resume
66+
67+
if (Thread.interrupted()) {
68+
throw new InterruptedException();
69+
}
70+
}
71+
72+
/** See {@link ConcurrentOperations#awaitAndCheckInterrupt(java.util.concurrent.locks.Condition)} */
73+
public static boolean awaitAndCheckInterrupt(Condition condition, long time, TimeUnit unit)
74+
throws InterruptedException {
75+
// Checkstyle: stop
76+
boolean value = condition.await(time, unit);
77+
// Checkstyle: resume
78+
79+
if (Thread.interrupted()) {
80+
throw new InterruptedException();
81+
}
82+
83+
return value;
84+
}
85+
4686
}

src/main/java/org/truffleruby/core/mutex/ConditionVariableNodes.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.truffleruby.annotations.CoreModule;
2222
import org.truffleruby.annotations.Primitive;
2323
import org.truffleruby.builtins.PrimitiveArrayArgumentsNode;
24+
import org.truffleruby.collections.ConcurrentOperations;
2425
import org.truffleruby.collections.Memo;
2526
import org.truffleruby.core.klass.RubyClass;
2627
import org.truffleruby.core.thread.RubyThread;
@@ -98,7 +99,7 @@ private void waitInternal(RubyConditionVariable conditionVariable, ReentrantLock
9899

99100
// condLock must be locked before unlocking mutexLock, to avoid losing potential signals.
100101
// We must not change the Ruby Thread status and not consume a Java thread interrupt while locking condLock.
101-
// If there is an interrupt, it should be consumed by condition.await() and the Ruby Thread sleep status
102+
// If there is an interrupt, it should be consumed by Condition#await() and the Ruby Thread sleep status
102103
// must imply being ready to be interrupted by Thread#{run,wakeup}.
103104
condLock.lock();
104105
int holdCount = 0; // can be > 1 for MonitorMixin
@@ -164,9 +165,10 @@ private void awaitSignal(RubyConditionVariable self, RubyThread thread, long dur
164165
* ReentrantLock exposed to Ruby, e.g., via a Ruby Mutex, as that could be held forever by
165166
* another Ruby thread (which did {code mutex.lock} after the #wait), and we would never be able
166167
* to interrupt both threads at the same time for a synchronous ThreadLocalAction). */
167-
condition.await(endNanoTime - currentTime, TimeUnit.NANOSECONDS);
168+
ConcurrentOperations.awaitAndCheckInterrupt(condition, endNanoTime - currentTime,
169+
TimeUnit.NANOSECONDS);
168170
} else {
169-
condition.await();
171+
ConcurrentOperations.awaitAndCheckInterrupt(condition);
170172
}
171173
if (consumeSignal(self)) {
172174
return BlockingAction.SUCCESS;

src/main/java/org/truffleruby/core/queue/SizedQueue.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.concurrent.locks.ReentrantLock;
1717

1818
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
19+
import org.truffleruby.collections.ConcurrentOperations;
1920

2021
public class SizedQueue {
2122

@@ -112,7 +113,7 @@ public boolean put(Object item) throws InterruptedException {
112113
return false;
113114
}
114115

115-
canAdd.await();
116+
ConcurrentOperations.awaitAndCheckInterrupt(canAdd);
116117
}
117118

118119
if (closed) {
@@ -144,17 +145,8 @@ public Object poll(long timeoutMilliseconds) throws InterruptedException {
144145

145146
try {
146147
if (size == 0) {
147-
final boolean signalled = canTake.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
148-
149-
/* We need to check the interrupted flag here, while holding the lock and after the await(). Otherwise,
150-
* if another thread does {@code th.raise(exc); q.push(1)} like in core/thread/handle_interrupt_spec.rb
151-
* then we might miss the ThreadLocalAction and instead return, which would incorrectly delay the
152-
* ThreadLocalAction if under {@code Thread.handle_interrupt(Object => :on_blocking)}. The other thread
153-
* cannot wait on the Future, as that might block arbitrary long when side-effects are disabled on this
154-
* thread. */
155-
if (Thread.interrupted()) {
156-
throw new InterruptedException();
157-
}
148+
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canTake, timeoutMilliseconds,
149+
TimeUnit.MILLISECONDS);
158150

159151
if (!signalled) {
160152
// Timed out.
@@ -197,7 +189,7 @@ public Object take() throws InterruptedException {
197189
return CLOSED;
198190
}
199191

200-
canTake.await();
192+
ConcurrentOperations.awaitAndCheckInterrupt(canTake);
201193
}
202194

203195
return doTake();

src/main/java/org/truffleruby/core/queue/UnsizedQueue.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.locks.ReentrantLock;
1818

1919
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
20+
import org.truffleruby.collections.ConcurrentOperations;
2021

2122
public class UnsizedQueue {
2223

@@ -95,17 +96,7 @@ public Object take() throws InterruptedException {
9596
return CLOSED;
9697
}
9798

98-
canTake.await();
99-
100-
/* We need to check the interrupted flag here, while holding the lock and after the await(). Otherwise,
101-
* if another thread does {@code th.raise(exc); q.push(1)} like in core/thread/handle_interrupt_spec.rb
102-
* then we might miss the ThreadLocalAction and instead return, which would incorrectly delay the
103-
* ThreadLocalAction if under {@code Thread.handle_interrupt(Object => :on_blocking)}. The other thread
104-
* cannot wait on the Future, as that might block arbitrary long when side-effects are disabled on this
105-
* thread. */
106-
if (Thread.interrupted()) {
107-
throw new InterruptedException();
108-
}
99+
ConcurrentOperations.awaitAndCheckInterrupt(canTake);
109100
}
110101

111102
return doTake();
@@ -120,11 +111,8 @@ public Object poll(long timeoutMilliseconds) throws InterruptedException {
120111

121112
try {
122113
if (takeEnd == null) {
123-
final boolean signalled = canTake.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
124-
125-
if (Thread.interrupted()) { // See comment of take()
126-
throw new InterruptedException();
127-
}
114+
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canTake, timeoutMilliseconds,
115+
TimeUnit.MILLISECONDS);
128116

129117
if (!signalled) {
130118
return null;

0 commit comments

Comments
 (0)