Skip to content

Commit 9a34952

Browse files
committed
[GR-19220] Add timeout argument to Thread::SizedQueue#pop (#3046)
PullRequest: truffleruby/3877
2 parents 816619e + 3d78334 commit 9a34952

File tree

15 files changed

+161
-45
lines changed

15 files changed

+161
-45
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Compatibility:
2323
* Add `String#byteindex` and `String#byterindex` (#3039, @itarato).
2424
* Add implementations of `rb_proc_call_with_block`, `rb_proc_call_kw`, `rb_proc_call_with_block_kw` and `rb_funcall_with_block_kw` (#3068, @andrykonchin).
2525
* Add optional `timeout` argument to `Thread::Queue#pop` (#3039, @itarato).
26+
* Add optional `timeout` argument to `Thread::SizedsQueue#pop` (#3039, @itarato).
2627

2728
Performance:
2829

lib/truffle/thread.rb

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,18 @@ def pop(non_block = false, timeout: nil)
3535
Primitive.queue_pop(
3636
self,
3737
Primitive.as_boolean(non_block),
38-
Truffle::QueueOperations.validate_and_prepare_timeout(non_block, timeout))
38+
Truffle::QueueOperations.validate_and_prepare_timeout_in_milliseconds(non_block, timeout))
39+
end
40+
alias_method :shift, :pop
41+
alias_method :deq, :pop
42+
end
43+
44+
class SizedQueue
45+
def pop(non_block = false, timeout: nil)
46+
Primitive.sized_queue_pop(
47+
self,
48+
Primitive.as_boolean(non_block),
49+
Truffle::QueueOperations.validate_and_prepare_timeout_in_milliseconds(non_block, timeout))
3950
end
4051
alias_method :shift, :pop
4152
alias_method :deq, :pop

spec/ruby/core/sizedqueue/deq_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/deque'
3+
require_relative '../../shared/types/rb_num2dbl_fails'
34

45
describe "SizedQueue#deq" do
56
it_behaves_like :queue_deq, :deq, -> { SizedQueue.new(10) }
67
end
8+
9+
describe "SizedQueue operations with timeout" do
10+
ruby_version_is "3.2" do
11+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(10); q.push(1); q.deq(timeout: v) }
12+
end
13+
end

spec/ruby/core/sizedqueue/pop_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/deque'
3+
require_relative '../../shared/types/rb_num2dbl_fails'
34

45
describe "SizedQueue#pop" do
56
it_behaves_like :queue_deq, :pop, -> { SizedQueue.new(10) }
67
end
8+
9+
describe "SizedQueue operations with timeout" do
10+
ruby_version_is "3.2" do
11+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(10); q.push(1); q.pop(timeout: v) }
12+
end
13+
end
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/deque'
3+
require_relative '../../shared/types/rb_num2dbl_fails'
34

45
describe "SizedQueue#shift" do
56
it_behaves_like :queue_deq, :shift, -> { SizedQueue.new(10) }
67
end
8+
9+
describe "SizedQueue operations with timeout" do
10+
ruby_version_is "3.2" do
11+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(10); q.push(1); q.shift(timeout: v) }
12+
end
13+
end

spec/ruby/shared/queue/deque.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@
123123
"can't set a timeout if non_block is enabled",
124124
)
125125
end
126+
127+
it "returns nil for a closed empty queue" do
128+
q = @object.call
129+
q.close
130+
q.send(@method, timeout: 0).should == nil
131+
end
126132
end
127133
end
128134

spec/truffleruby.next-specs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ spec/ruby/core/string/byterindex_spec.rb
2525
spec/ruby/core/queue/deq_spec.rb
2626
spec/ruby/core/queue/pop_spec.rb
2727
spec/ruby/core/queue/shift_spec.rb
28+
29+
spec/ruby/core/sizedqueue/deq_spec.rb
30+
spec/ruby/core/sizedqueue/pop_spec.rb
31+
spec/ruby/core/sizedqueue/shift_spec.rb

src/main/.checkstyle_checks.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@
149149
<property name="format" value="\.computeIfAbsent"/>
150150
<property name="message" value="Use ConcurrentOperations.getOrCompute() instead of ConcurrentHashMap.computeIfAbsent() which does not scale"/>
151151
</module>
152+
<module name="RegexpSinglelineJava">
153+
<property name="format" value="\b(?!\w*[Ll]atch)\w+\.await\(.*\)"/>
154+
<property name="message" value="Use ConcurrentOperations.awaitAndCheckInterrupt() instead"/>
155+
</module>
152156
<module name="RegexpSinglelineJava">
153157
<property name="format" value="@NodeChildren"/>
154158
<property name="message" value="Do not use @NodeChildren, use multiple @Child annotations directly"/>

src/main/java/org/truffleruby/collections/ConcurrentOperations.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313

1414
import java.util.Map;
1515
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
18+
import java.util.concurrent.locks.Condition;
19+
import java.util.concurrent.locks.ReentrantLock;
1620
import java.util.function.Function;
1721

1822
public abstract class ConcurrentOperations {
@@ -43,4 +47,40 @@ public static <K, V> boolean replace(Map<K, V> map, K key, V oldValue, V newValu
4347
return oldValue == null ? map.putIfAbsent(key, newValue) == null : map.replace(key, oldValue, newValue);
4448
}
4549

50+
/** {@link Condition#await()} might return and not throw InterruptedException if there was both a signal and an
51+
* interrupt before the lock could be re-acquired by await(). We always want the InterruptedException if both a
52+
* signal and an interrupt happen.
53+
* <hr>
54+
* {@link Condition#await()} when it sees both a signal and a {@link Thread#interrupt()} has to decide whether it
55+
* prefers the signal or the interrupt. For a {@link ReentrantLock#newCondition()} condition, the implementation in
56+
* {@link AbstractQueuedSynchronizer.ConditionObject#await()} sometimes throws InterruptedException and sometimes
57+
* returns, depending on which happens first. Because await() needs to reacquire the lock, even if the
58+
* InterruptedException happens before it might still become a situation of "both signal and interrupt". If it
59+
* prefers the signal and just returns, it does {@code Thread.currentThread().interrupt()} to let us know there was
60+
* an interrupt too. In any case, if there was any interrupt we want to throw InterruptedException, regardless of
61+
* what the implementation prefers. */
62+
public static void awaitAndCheckInterrupt(Condition condition) throws InterruptedException {
63+
// Checkstyle: stop
64+
condition.await();
65+
// Checkstyle: resume
66+
67+
if (Thread.interrupted()) {
68+
throw new InterruptedException();
69+
}
70+
}
71+
72+
/** See {@link ConcurrentOperations#awaitAndCheckInterrupt(java.util.concurrent.locks.Condition)} */
73+
public static boolean awaitAndCheckInterrupt(Condition condition, long time, TimeUnit unit)
74+
throws InterruptedException {
75+
// Checkstyle: stop
76+
boolean value = condition.await(time, unit);
77+
// Checkstyle: resume
78+
79+
if (Thread.interrupted()) {
80+
throw new InterruptedException();
81+
}
82+
83+
return value;
84+
}
85+
4686
}

src/main/java/org/truffleruby/core/mutex/ConditionVariableNodes.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.truffleruby.annotations.CoreModule;
2222
import org.truffleruby.annotations.Primitive;
2323
import org.truffleruby.builtins.PrimitiveArrayArgumentsNode;
24+
import org.truffleruby.collections.ConcurrentOperations;
2425
import org.truffleruby.collections.Memo;
2526
import org.truffleruby.core.klass.RubyClass;
2627
import org.truffleruby.core.thread.RubyThread;
@@ -98,7 +99,7 @@ private void waitInternal(RubyConditionVariable conditionVariable, ReentrantLock
9899

99100
// condLock must be locked before unlocking mutexLock, to avoid losing potential signals.
100101
// We must not change the Ruby Thread status and not consume a Java thread interrupt while locking condLock.
101-
// If there is an interrupt, it should be consumed by condition.await() and the Ruby Thread sleep status
102+
// If there is an interrupt, it should be consumed by Condition#await() and the Ruby Thread sleep status
102103
// must imply being ready to be interrupted by Thread#{run,wakeup}.
103104
condLock.lock();
104105
int holdCount = 0; // can be > 1 for MonitorMixin
@@ -164,9 +165,10 @@ private void awaitSignal(RubyConditionVariable self, RubyThread thread, long dur
164165
* ReentrantLock exposed to Ruby, e.g., via a Ruby Mutex, as that could be held forever by
165166
* another Ruby thread (which did {code mutex.lock} after the #wait), and we would never be able
166167
* to interrupt both threads at the same time for a synchronous ThreadLocalAction). */
167-
condition.await(endNanoTime - currentTime, TimeUnit.NANOSECONDS);
168+
ConcurrentOperations.awaitAndCheckInterrupt(condition, endNanoTime - currentTime,
169+
TimeUnit.NANOSECONDS);
168170
} else {
169-
condition.await();
171+
ConcurrentOperations.awaitAndCheckInterrupt(condition);
170172
}
171173
if (consumeSignal(self)) {
172174
return BlockingAction.SUCCESS;

0 commit comments

Comments
 (0)