Skip to content

Commit 0e4ec6e

Browse files
committed
[GR-19220] Add timeout to Thread::SizedQueue#push
PullRequest: truffleruby/3921
2 parents 2ea4d8d + 473f48e commit 0e4ec6e

File tree

10 files changed

+148
-20
lines changed

10 files changed

+148
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ Compatibility:
3333
* Fix `Encoding::Converter#primitive_convert` and raise `FrozenError` when a destination buffer argument is frozen (@andrykonchin).
3434
* Add `Module#undefined_instance_methods` (#3039, @itarato).
3535
* Add `Thread.each_caller_location` (#3039, @itarato).
36+
* Add `timeout` argument to `Thread::SizedQueue#push` (#3039, @itarato).
3637

3738
Performance:
3839

lib/truffle/thread.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,15 @@ def pop(non_block = false, timeout: nil)
5050
end
5151
alias_method :shift, :pop
5252
alias_method :deq, :pop
53+
54+
def push(value, non_block = false, timeout: nil)
55+
Primitive.sized_queue_push(
56+
self,
57+
value,
58+
Primitive.as_boolean(non_block),
59+
Truffle::QueueOperations.validate_and_prepare_timeout_in_milliseconds(non_block, timeout))
60+
end
61+
alias_method :<<, :push
62+
alias_method :enq, :push
5363
end
5464
end

spec/ruby/core/sizedqueue/append_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/enque'
33
require_relative '../../shared/sizedqueue/enque'
4+
require_relative '../../shared/types/rb_num2dbl_fails'
45

56
describe "SizedQueue#<<" do
67
it_behaves_like :queue_enq, :<<, -> { SizedQueue.new(10) }
@@ -9,3 +10,9 @@
910
describe "SizedQueue#<<" do
1011
it_behaves_like :sizedqueue_enq, :<<, -> n { SizedQueue.new(n) }
1112
end
13+
14+
describe "SizedQueue operations with timeout" do
15+
ruby_version_is "3.2" do
16+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(1); q.send(:<<, 1, timeout: v) }
17+
end
18+
end

spec/ruby/core/sizedqueue/enq_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/enque'
33
require_relative '../../shared/sizedqueue/enque'
4+
require_relative '../../shared/types/rb_num2dbl_fails'
45

56
describe "SizedQueue#enq" do
67
it_behaves_like :queue_enq, :enq, -> { SizedQueue.new(10) }
@@ -9,3 +10,9 @@
910
describe "SizedQueue#enq" do
1011
it_behaves_like :sizedqueue_enq, :enq, -> n { SizedQueue.new(n) }
1112
end
13+
14+
describe "SizedQueue operations with timeout" do
15+
ruby_version_is "3.2" do
16+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(1); q.enq(1, timeout: v) }
17+
end
18+
end

spec/ruby/core/sizedqueue/push_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/enque'
33
require_relative '../../shared/sizedqueue/enque'
4+
require_relative '../../shared/types/rb_num2dbl_fails'
45

56
describe "SizedQueue#push" do
67
it_behaves_like :queue_enq, :push, -> { SizedQueue.new(10) }
@@ -9,3 +10,9 @@
910
describe "SizedQueue#push" do
1011
it_behaves_like :sizedqueue_enq, :push, -> n { SizedQueue.new(n) }
1112
end
13+
14+
describe "SizedQueue operations with timeout" do
15+
ruby_version_is "3.2" do
16+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(1); q.push(1, timeout: v) }
17+
end
18+
end

spec/ruby/shared/sizedqueue/enque.rb

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
q << 1
3838

3939
t = Thread.new {
40-
-> { q.send(@method, 2) }.should raise_error(ClosedQueueError)
40+
-> { q.send(@method, 2) }.should raise_error(ClosedQueueError, "queue closed")
4141
}
4242

4343
Thread.pass until q.num_waiting == 1
@@ -108,6 +108,28 @@
108108
"can't set a timeout if non_block is enabled",
109109
)
110110
end
111+
112+
it "raise ClosedQueueError when closed before enqueued" do
113+
q = @object.call(1)
114+
q.close
115+
-> { q.send(@method, 2, timeout: 1) }.should raise_error(ClosedQueueError, "queue closed")
116+
end
117+
118+
it "interrupts enqueuing threads with ClosedQueueError when the queue is closed" do
119+
q = @object.call(1)
120+
q << 1
121+
122+
t = Thread.new {
123+
-> { q.send(@method, 1, timeout: 0.1) }.should raise_error(ClosedQueueError, "queue closed")
124+
}
125+
126+
Thread.pass until q.num_waiting == 1
127+
128+
q.close
129+
130+
t.join
131+
q.pop.should == 1
132+
end
111133
end
112134
end
113135
end

spec/truffleruby.next-specs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ spec/ruby/core/queue/shift_spec.rb
2929
spec/ruby/core/sizedqueue/deq_spec.rb
3030
spec/ruby/core/sizedqueue/pop_spec.rb
3131
spec/ruby/core/sizedqueue/shift_spec.rb
32+
spec/ruby/core/sizedqueue/append_spec.rb
33+
spec/ruby/core/sizedqueue/enq_spec.rb
34+
spec/ruby/core/sizedqueue/push_spec.rb
3235

3336
spec/ruby/core/module/refinements_spec.rb
3437
spec/ruby/core/module/undefined_instance_methods_spec.rb

src/main/java/org/truffleruby/core/queue/SizedQueue.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,37 @@ public boolean put(Object item) throws InterruptedException {
127127
}
128128
}
129129

130+
@TruffleBoundary
131+
public Object put(Object item, long timeoutMilliseconds) throws InterruptedException {
132+
lock.lock();
133+
try {
134+
if (closed) {
135+
return CLOSED;
136+
}
137+
138+
final long deadline = System.currentTimeMillis() + timeoutMilliseconds;
139+
140+
while (size == capacity) {
141+
final long currentTimeout = deadline - System.currentTimeMillis();
142+
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canAdd, currentTimeout,
143+
TimeUnit.MILLISECONDS);
144+
145+
if (!signalled) {
146+
return false;
147+
}
148+
149+
if (closed) {
150+
return CLOSED;
151+
}
152+
}
153+
154+
doAdd(item);
155+
return true;
156+
} finally {
157+
lock.unlock();
158+
}
159+
}
160+
130161
private void doAdd(Object item) {
131162
items[addEnd] = item;
132163
addEnd++;
@@ -144,17 +175,20 @@ public Object poll(long timeoutMilliseconds) throws InterruptedException {
144175
lock.lock();
145176

146177
try {
147-
if (size == 0) {
148-
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canTake, timeoutMilliseconds,
178+
final long deadline = System.currentTimeMillis() + timeoutMilliseconds;
179+
180+
while (size == 0) {
181+
final long currentTimeout = deadline - System.currentTimeMillis();
182+
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canTake, currentTimeout,
149183
TimeUnit.MILLISECONDS);
150184

151-
if (!signalled) { // timed out
185+
if (!signalled) { // Timed out.
152186
return null;
153187
}
188+
}
154189

155-
if (closed) {
156-
return CLOSED;
157-
}
190+
if (closed) {
191+
return CLOSED;
158192
}
159193

160194
return doTake();

src/main/java/org/truffleruby/core/queue/SizedQueueNodes.java

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,27 +100,29 @@ protected int max(RubySizedQueue self) {
100100

101101
}
102102

103-
@CoreMethod(names = { "push", "<<", "enq" }, required = 1, optional = 1)
104-
public abstract static class SizedQueuePushNode extends CoreMethodArrayArgumentsNode {
103+
@Primitive(name = "sized_queue_push")
104+
public abstract static class SizedQueuePushNode extends PrimitiveArrayArgumentsNode {
105105

106106
@Specialization
107-
protected RubySizedQueue doPush(RubySizedQueue self, final Object value, Object maybeNonBlocking,
107+
protected Object doPush(
108+
RubySizedQueue self, final Object value, Object maybeNonBlocking, Object timeoutMilliseconds,
108109
@Cached BooleanCastWithDefaultNode booleanCastWithDefaultNode,
109110
@Cached PushNode pushNode) {
110111
final boolean nonBlocking = booleanCastWithDefaultNode.execute(this, maybeNonBlocking, false);
111-
return pushNode.execute(this, self, value, nonBlocking);
112+
return pushNode.execute(this, self, value, nonBlocking, timeoutMilliseconds);
112113
}
113114
}
114115

115116
@GenerateInline
116117
@GenerateCached(false)
117118
public abstract static class PushNode extends RubyBaseNode {
118119

119-
public abstract RubySizedQueue execute(Node node, RubySizedQueue self, final Object value, boolean nonBlocking);
120+
public abstract Object execute(Node node, RubySizedQueue self, final Object value, boolean nonBlocking,
121+
Object timeoutMilliseconds);
120122

121123
@Specialization(guards = "!nonBlocking")
122124
protected static RubySizedQueue pushBlocking(
123-
Node node, RubySizedQueue self, final Object value, boolean nonBlocking,
125+
Node node, RubySizedQueue self, final Object value, boolean nonBlocking, Nil timeoutMilliseconds,
124126
@Cached @Shared PropagateSharingNode propagateSharingNode) {
125127
final SizedQueue queue = self.queue;
126128

@@ -141,9 +143,41 @@ private static void doPushBlocking(Node node, final Object value, final SizedQue
141143
});
142144
}
143145

146+
@Specialization(guards = "!nonBlocking")
147+
protected static Object pushTimeout(
148+
Node node, RubySizedQueue self, final Object value, boolean nonBlocking, long timeoutMilliseconds,
149+
@Cached @Shared PropagateSharingNode propagateSharingNode) {
150+
final SizedQueue queue = self.queue;
151+
propagateSharingNode.execute(node, self, value);
152+
final long deadline = System.currentTimeMillis() + timeoutMilliseconds;
153+
154+
boolean success = getContext(node).getThreadManager().runUntilResult(node, () -> {
155+
final long currentTimeout = deadline - System.currentTimeMillis();
156+
final Object result;
157+
158+
if (currentTimeout > 0) {
159+
result = queue.put(value, currentTimeout);
160+
} else {
161+
result = queue.put(value, 0);
162+
}
163+
164+
if (result == SizedQueue.CLOSED) {
165+
throw new RaiseException(getContext(node), coreExceptions(node).closedQueueError(node));
166+
}
167+
168+
return (boolean) result;
169+
});
170+
171+
if (success) {
172+
return self;
173+
}
174+
175+
return nil;
176+
}
177+
144178
@Specialization(guards = "nonBlocking")
145179
protected static RubySizedQueue pushNonBlock(
146-
Node node, RubySizedQueue self, final Object value, boolean nonBlocking,
180+
Node node, RubySizedQueue self, final Object value, boolean nonBlocking, Nil timeoutMilliseconds,
147181
@Cached @Shared PropagateSharingNode propagateSharingNode,
148182
@Cached InlinedBranchProfile errorProfile) {
149183
final SizedQueue queue = self.queue;

src/main/java/org/truffleruby/core/queue/UnsizedQueue.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,17 +110,20 @@ public Object poll(long timeoutMilliseconds) throws InterruptedException {
110110
lock.lock();
111111

112112
try {
113-
if (takeEnd == null) {
114-
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canTake, timeoutMilliseconds,
113+
final long deadline = System.currentTimeMillis() + timeoutMilliseconds;
114+
115+
while (takeEnd == null) {
116+
final long currentTimeout = deadline - System.currentTimeMillis();
117+
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canTake, currentTimeout,
115118
TimeUnit.MILLISECONDS);
116119

117-
if (!signalled) { // timed out
120+
if (!signalled) { // Timed out.
118121
return null;
119122
}
123+
}
120124

121-
if (closed) {
122-
return CLOSED;
123-
}
125+
if (closed) {
126+
return CLOSED;
124127
}
125128

126129
return doTake();

0 commit comments

Comments
 (0)