Skip to content

Commit bf44737

Browse files
committed
Add timeout argument to Thread::SizedQueue#pop.
1 parent f403dd5 commit bf44737

File tree

10 files changed

+113
-22
lines changed

10 files changed

+113
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Compatibility:
2323
* Add `String#byteindex` and `String#byterindex` (#3039, @itarato).
2424
* Add implementations of `rb_proc_call_with_block`, `rb_proc_call_kw`, `rb_proc_call_with_block_kw` and `rb_funcall_with_block_kw` (#3068, @andrykonchin).
2525
* Add optional `timeout` argument to `Thread::Queue#pop` (#3039, @itarato).
26+
* Add optional `timeout` argument to `Thread::SizedsQueue#pop` (#3039, @itarato).
2627

2728
Performance:
2829

lib/truffle/thread.rb

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,18 @@ def pop(non_block = false, timeout: nil)
3535
Primitive.queue_pop(
3636
self,
3737
Primitive.as_boolean(non_block),
38-
Truffle::QueueOperations.validate_and_prepare_timeout(non_block, timeout))
38+
Truffle::QueueOperations.validate_and_prepare_timeout_in_milliseconds(non_block, timeout))
39+
end
40+
alias_method :shift, :pop
41+
alias_method :deq, :pop
42+
end
43+
44+
class SizedQueue
45+
def pop(non_block = false, timeout: nil)
46+
Primitive.sized_queue_pop(
47+
self,
48+
Primitive.as_boolean(non_block),
49+
Truffle::QueueOperations.validate_and_prepare_timeout_in_milliseconds(non_block, timeout))
3950
end
4051
alias_method :shift, :pop
4152
alias_method :deq, :pop

spec/ruby/core/sizedqueue/deq_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/deque'
3+
require_relative '../../shared/types/rb_num2dbl_fails'
34

45
describe "SizedQueue#deq" do
56
it_behaves_like :queue_deq, :deq, -> { SizedQueue.new(10) }
67
end
8+
9+
describe "SizedQueue operations with timeout" do
10+
ruby_version_is "3.2" do
11+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(10); q.push(1); q.deq(timeout: v) }
12+
end
13+
end

spec/ruby/core/sizedqueue/pop_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/deque'
3+
require_relative '../../shared/types/rb_num2dbl_fails'
34

45
describe "SizedQueue#pop" do
56
it_behaves_like :queue_deq, :pop, -> { SizedQueue.new(10) }
67
end
8+
9+
describe "SizedQueue operations with timeout" do
10+
ruby_version_is "3.2" do
11+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(10); q.push(1); q.pop(timeout: v) }
12+
end
13+
end
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/deque'
3+
require_relative '../../shared/types/rb_num2dbl_fails'
34

45
describe "SizedQueue#shift" do
56
it_behaves_like :queue_deq, :shift, -> { SizedQueue.new(10) }
67
end
8+
9+
describe "SizedQueue operations with timeout" do
10+
ruby_version_is "3.2" do
11+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(10); q.push(1); q.shift(timeout: v) }
12+
end
13+
end

spec/ruby/shared/queue/deque.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@
123123
"can't set a timeout if non_block is enabled",
124124
)
125125
end
126+
127+
it "returns nil for a closed empty queue" do
128+
q = @object.call
129+
q.close
130+
q.send(@method, timeout: 0).should == nil
131+
end
126132
end
127133
end
128134

spec/truffleruby.next-specs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ spec/ruby/core/string/byterindex_spec.rb
2525
spec/ruby/core/queue/deq_spec.rb
2626
spec/ruby/core/queue/pop_spec.rb
2727
spec/ruby/core/queue/shift_spec.rb
28+
29+
spec/ruby/core/sizedqueue/deq_spec.rb
30+
spec/ruby/core/sizedqueue/pop_spec.rb
31+
spec/ruby/core/sizedqueue/shift_spec.rb

src/main/java/org/truffleruby/core/queue/SizedQueue.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import java.util.ArrayList;
1313
import java.util.Collection;
14+
import java.util.concurrent.TimeUnit;
1415
import java.util.concurrent.locks.Condition;
1516
import java.util.concurrent.locks.ReentrantLock;
1617

@@ -137,6 +138,40 @@ private void doAdd(Object item) {
137138
}
138139
}
139140

141+
@TruffleBoundary
142+
public Object poll(long timeoutMilliseconds) throws InterruptedException {
143+
lock.lock();
144+
145+
try {
146+
if (size == 0) {
147+
final boolean signalled = canTake.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
148+
149+
/* We need to check the interrupted flag here, while holding the lock and after the await(). Otherwise,
150+
* if another thread does {@code th.raise(exc); q.push(1)} like in core/thread/handle_interrupt_spec.rb
151+
* then we might miss the ThreadLocalAction and instead return, which would incorrectly delay the
152+
* ThreadLocalAction if under {@code Thread.handle_interrupt(Object => :on_blocking)}. The other thread
153+
* cannot wait on the Future, as that might block arbitrary long when side-effects are disabled on this
154+
* thread. */
155+
if (Thread.interrupted()) {
156+
throw new InterruptedException();
157+
}
158+
159+
if (!signalled) {
160+
// Timed out.
161+
return null;
162+
}
163+
164+
if (closed) {
165+
return CLOSED;
166+
}
167+
}
168+
169+
return doTake();
170+
} finally {
171+
lock.unlock();
172+
}
173+
}
174+
140175
@TruffleBoundary
141176
public Object poll() {
142177
lock.lock();

src/main/java/org/truffleruby/core/queue/SizedQueueNodes.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616
import com.oracle.truffle.api.dsl.Specialization;
1717
import com.oracle.truffle.api.profiles.InlinedBranchProfile;
1818
import org.truffleruby.annotations.CoreMethod;
19+
import org.truffleruby.annotations.Primitive;
1920
import org.truffleruby.builtins.CoreMethodArrayArgumentsNode;
2021
import org.truffleruby.builtins.CoreMethodNode;
2122
import org.truffleruby.annotations.CoreModule;
23+
import org.truffleruby.builtins.PrimitiveArrayArgumentsNode;
2224
import org.truffleruby.core.cast.BooleanCastWithDefaultNode;
2325
import org.truffleruby.core.klass.RubyClass;
2426
import org.truffleruby.core.thread.ThreadManager.BlockingAction;
27+
import org.truffleruby.language.Nil;
2528
import org.truffleruby.language.RubyBaseNodeWithExecute;
2629
import org.truffleruby.language.RubyNode;
2730
import org.truffleruby.annotations.Visibility;
@@ -154,18 +157,11 @@ protected RubySizedQueue pushNonBlock(RubySizedQueue self, final Object value, b
154157

155158
}
156159

157-
@CoreMethod(names = { "pop", "shift", "deq" }, optional = 1)
158-
@NodeChild(value = "queue", type = RubyNode.class)
159-
@NodeChild(value = "nonBlocking", type = RubyBaseNodeWithExecute.class)
160-
public abstract static class PopNode extends CoreMethodNode {
161-
162-
@CreateCast("nonBlocking")
163-
protected RubyBaseNodeWithExecute coerceToBoolean(RubyBaseNodeWithExecute nonBlocking) {
164-
return BooleanCastWithDefaultNode.create(false, nonBlocking);
165-
}
160+
@Primitive(name = "sized_queue_pop")
161+
public abstract static class PopNode extends PrimitiveArrayArgumentsNode {
166162

167163
@Specialization(guards = "!nonBlocking")
168-
protected Object popBlocking(RubySizedQueue self, boolean nonBlocking) {
164+
protected Object popBlocking(RubySizedQueue self, boolean nonBlocking, Nil timeoutMilliseconds) {
169165
final SizedQueue queue = self.queue;
170166

171167
final Object value = doPop(queue);
@@ -182,8 +178,31 @@ private Object doPop(SizedQueue queue) {
182178
return getContext().getThreadManager().runUntilResult(this, queue::take);
183179
}
184180

181+
@Specialization(guards = "!nonBlocking")
182+
protected Object popBlocking(RubySizedQueue self, boolean nonBlocking, long timeoutMilliseconds) {
183+
final SizedQueue queue = self.queue;
184+
final long deadline = System.currentTimeMillis() + timeoutMilliseconds;
185+
186+
return getContext().getThreadManager().runUntilResult(this, () -> {
187+
final long currentTimeout = deadline - System.currentTimeMillis();
188+
final Object value;
189+
190+
if (currentTimeout > 0) {
191+
value = queue.poll(currentTimeout);
192+
} else {
193+
value = queue.poll();
194+
}
195+
196+
if (value == SizedQueue.CLOSED || value == null) {
197+
return nil;
198+
} else {
199+
return value;
200+
}
201+
});
202+
}
203+
185204
@Specialization(guards = "nonBlocking")
186-
protected Object popNonBlock(RubySizedQueue self, boolean nonBlocking,
205+
protected Object popNonBlock(RubySizedQueue self, boolean nonBlocking, Nil timeoutMilliseconds,
187206
@Cached InlinedBranchProfile errorProfile) {
188207
final SizedQueue queue = self.queue;
189208

src/main/ruby/truffleruby/core/truffle/queue_operations.rb

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,11 @@
1010

1111
module Truffle
1212
module QueueOperations
13-
def self.validate_and_prepare_timeout(non_block, timeout)
14-
if non_block
15-
raise ArgumentError, "can't set a timeout if non_block is enabled" unless Primitive.nil?(timeout)
16-
else
17-
unless Primitive.nil?(timeout)
18-
# Convert seconds to milliseconds.
19-
return (Truffle::Type.rb_num2dbl(timeout) * 1000).to_i
20-
end
21-
end
13+
def self.validate_and_prepare_timeout_in_milliseconds(non_block, timeout_seconds_or_nil)
14+
return timeout_seconds_or_nil if Primitive.nil?(timeout_seconds_or_nil)
15+
raise ArgumentError, "can't set a timeout if non_block is enabled" if non_block
2216

23-
timeout
17+
(Truffle::Type.rb_num2dbl(timeout_seconds_or_nil) * 1000).to_i
2418
end
2519
end
2620
end

0 commit comments

Comments
 (0)