Skip to content

Commit 38a72cb

Browse files
committed
Move queue pop to ruby and add timeout keyword argument.
1 parent 0f8092e commit 38a72cb

File tree

13 files changed

+141
-63
lines changed

13 files changed

+141
-63
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Compatibility:
2020
* Add `String#bytesplice` (#3039, @itarato).
2121
* Add `String#byteindex` and `String#byterindex` (#3039, @itarato).
2222
* Add implementations of `rb_proc_call_with_block`, `rb_proc_call_kw`, `rb_proc_call_with_block_kw` and `rb_funcall_with_block_kw` (#3068, @andrykonchin).
23+
* Add optional `timeout` argument to `Thread::Queue#pop` (#3039, @itarato).
2324

2425
Performance:
2526

lib/truffle/thread.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,15 @@ class ThreadError < StandardError
2929
class Thread
3030
Queue = ::Queue
3131
SizedQueue = ::SizedQueue
32+
33+
class Queue
34+
def pop(non_block = false, timeout: nil)
35+
Primitive.queue_pop(
36+
self,
37+
Primitive.as_boolean(non_block),
38+
Truffle::QueueOperations.validate_and_prepare_timeout(non_block, timeout))
39+
end
40+
alias_method :shift, :pop
41+
alias_method :deq, :pop
42+
end
3243
end

spec/ruby/core/queue/deq_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/deque'
3+
require_relative '../../shared/types/rb_num2dbl_fails'
34

45
describe "Queue#deq" do
56
it_behaves_like :queue_deq, :deq, -> { Queue.new }
67
end
8+
9+
describe "Queue operations with timeout" do
10+
ruby_version_is "3.2" do
11+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = Queue.new; q.push(1); q.deq(timeout: v) }
12+
end
13+
end

spec/ruby/core/queue/pop_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/deque'
3+
require_relative '../../shared/types/rb_num2dbl_fails'
34

45
describe "Queue#pop" do
56
it_behaves_like :queue_deq, :pop, -> { Queue.new }
67
end
8+
9+
describe "Queue operations with timeout" do
10+
ruby_version_is "3.2" do
11+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = Queue.new; q.push(1); q.pop(timeout: v) }
12+
end
13+
end

spec/ruby/core/queue/shift_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/deque'
3+
require_relative '../../shared/types/rb_num2dbl_fails'
34

45
describe "Queue#shift" do
56
it_behaves_like :queue_deq, :shift, -> { Queue.new }
67
end
8+
9+
describe "Queue operations with timeout" do
10+
ruby_version_is "3.2" do
11+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = Queue.new; q.push(1); q.shift(timeout: v) }
12+
end
13+
end

spec/ruby/shared/queue/deque.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@
3737
q.send(@method).should == 1
3838
end
3939

40+
it "converts false-ish for non_blocking to boolean" do
41+
q = @object.call
42+
q << 1
43+
q << 2
44+
45+
q.send(@method, false).should == 1
46+
q.send(@method, nil).should == 2
47+
end
48+
4049
it "returns nil for a closed empty queue" do
4150
q = @object.call
4251
q.close
@@ -143,5 +152,13 @@
143152
q.close
144153
-> { q.send(@method, true) }.should raise_error(ThreadError)
145154
end
155+
156+
it "converts true-ish non_blocking argument to true" do
157+
q = @object.call
158+
159+
-> { q.send(@method, true) }.should raise_error(ThreadError)
160+
-> { q.send(@method, 1) }.should raise_error(ThreadError)
161+
-> { q.send(@method, "") }.should raise_error(ThreadError)
162+
end
146163
end
147164
end
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#
2+
# Shared tests for rb_num2dbl related conversion failures.
3+
#
4+
# Usage example:
5+
# it_behaves_like :rb_num2dbl_fails, nil, -> v { o = A.new; o.foo(v) }
6+
#
7+
8+
describe :rb_num2dbl_fails, shared: true do
9+
it "fails if string is provided" do
10+
-> { @object.call("123") }.should raise_error(TypeError, "no implicit conversion to float from string")
11+
end
12+
13+
it "fails if boolean is provided" do
14+
-> { @object.call(true) }.should raise_error(TypeError, "no implicit conversion to float from true")
15+
-> { @object.call(false) }.should raise_error(TypeError, "no implicit conversion to float from false")
16+
end
17+
end

spec/truffleruby.next-specs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,7 @@ spec/ruby/core/string/bytesplice_spec.rb
2121

2222
spec/ruby/core/string/byteindex_spec.rb
2323
spec/ruby/core/string/byterindex_spec.rb
24+
25+
spec/ruby/core/queue/deq_spec.rb
26+
spec/ruby/core/queue/pop_spec.rb
27+
spec/ruby/core/queue/shift_spec.rb

src/main/java/org/truffleruby/core/CoreLibrary.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,6 +1053,7 @@ public boolean isTruffleBootMainMethod(SharedMethodInfo info) {
10531053
"/core/posix.rb",
10541054
"/core/main.rb",
10551055
"/core/post.rb",
1056+
"/core/truffle/queue_operations.rb",
10561057
POST_BOOT_FILE
10571058
};
10581059

src/main/java/org/truffleruby/core/queue/QueueNodes.java

Lines changed: 37 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,17 @@
1313
import com.oracle.truffle.api.library.CachedLibrary;
1414
import com.oracle.truffle.api.profiles.InlinedBranchProfile;
1515
import org.truffleruby.annotations.CoreMethod;
16+
import org.truffleruby.annotations.Primitive;
1617
import org.truffleruby.builtins.CoreMethodArrayArgumentsNode;
17-
import org.truffleruby.builtins.CoreMethodNode;
1818
import org.truffleruby.annotations.CoreModule;
19-
import org.truffleruby.builtins.NonStandard;
19+
import org.truffleruby.builtins.PrimitiveArrayArgumentsNode;
2020
import org.truffleruby.core.array.ArrayGuards;
2121
import org.truffleruby.core.array.RubyArray;
2222
import org.truffleruby.core.array.library.ArrayStoreLibrary;
23-
import org.truffleruby.core.cast.BooleanCastWithDefaultNode;
2423
import org.truffleruby.core.cast.ToANode;
2524
import org.truffleruby.core.klass.RubyClass;
25+
import org.truffleruby.language.Nil;
2626
import org.truffleruby.language.NotProvided;
27-
import org.truffleruby.language.RubyBaseNodeWithExecute;
28-
import org.truffleruby.language.RubyNode;
2927
import org.truffleruby.annotations.Visibility;
3028
import org.truffleruby.language.control.RaiseException;
3129
import org.truffleruby.language.objects.AllocationTracing;
@@ -34,8 +32,6 @@
3432
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
3533
import com.oracle.truffle.api.dsl.Cached;
3634
import com.oracle.truffle.api.dsl.Cached.Exclusive;
37-
import com.oracle.truffle.api.dsl.CreateCast;
38-
import com.oracle.truffle.api.dsl.NodeChild;
3935
import com.oracle.truffle.api.dsl.Specialization;
4036

4137
@CoreModule(value = "Queue", isClass = true)
@@ -73,18 +69,11 @@ protected RubyQueue push(RubyQueue self, final Object value) {
7369

7470
}
7571

76-
@CoreMethod(names = { "pop", "shift", "deq" }, optional = 1)
77-
@NodeChild(value = "queue", type = RubyNode.class)
78-
@NodeChild(value = "nonBlocking", type = RubyBaseNodeWithExecute.class)
79-
public abstract static class PopNode extends CoreMethodNode {
80-
81-
@CreateCast("nonBlocking")
82-
protected RubyBaseNodeWithExecute coerceToBoolean(RubyBaseNodeWithExecute nonBlocking) {
83-
return BooleanCastWithDefaultNode.create(false, nonBlocking);
84-
}
72+
@Primitive(name = "queue_pop")
73+
public abstract static class PopNode extends PrimitiveArrayArgumentsNode {
8574

8675
@Specialization(guards = "!nonBlocking")
87-
protected Object popBlocking(RubyQueue self, boolean nonBlocking,
76+
protected Object popBlocking(RubyQueue self, boolean nonBlocking, Nil timeoutMilliseconds,
8877
@Exclusive @Cached InlinedBranchProfile closedProfile) {
8978
final UnsizedQueue queue = self.queue;
9079

@@ -103,61 +92,48 @@ private Object doPop(UnsizedQueue queue) {
10392
return getContext().getThreadManager().runUntilResult(this, queue::take);
10493
}
10594

106-
@Specialization(guards = "nonBlocking")
107-
protected Object popNonBlock(RubyQueue self, boolean nonBlocking,
108-
@Exclusive @Cached InlinedBranchProfile errorProfile) {
95+
@Specialization(guards = "!nonBlocking")
96+
protected Object popBlocking(RubyQueue self, boolean nonBlocking, long timeoutMilliseconds,
97+
@Exclusive @Cached InlinedBranchProfile closedProfile) {
10998
final UnsizedQueue queue = self.queue;
99+
final long deadline = System.currentTimeMillis() + timeoutMilliseconds;
110100

111-
final Object value = queue.poll();
112-
113-
if (value == null) {
114-
errorProfile.enter(this);
115-
throw new RaiseException(getContext(), coreExceptions().threadError("queue empty", this));
116-
} else {
117-
return value;
118-
}
119-
}
101+
var result = getContext().getThreadManager().runUntilResult(this, () -> {
102+
final long currentTimeout = deadline - System.currentTimeMillis();
103+
final Object value;
120104

121-
}
105+
if (currentTimeout > 0) {
106+
value = queue.poll(currentTimeout);
107+
} else {
108+
value = queue.poll();
109+
}
122110

123-
@NonStandard
124-
@CoreMethod(names = "receive_timeout", required = 1, visibility = Visibility.PRIVATE, lowerFixnum = 1)
125-
public abstract static class ReceiveTimeoutNode extends CoreMethodArrayArgumentsNode {
111+
if (value == UnsizedQueue.CLOSED) {
112+
closedProfile.enter(this);
113+
return nil;
114+
} else {
115+
return value;
116+
}
117+
});
126118

127-
@Specialization
128-
protected Object receiveTimeout(RubyQueue self, int duration) {
129-
return receiveTimeout(self, (double) duration);
119+
if (result == null) {
120+
return nil;
121+
}
122+
return result;
130123
}
131124

132-
@TruffleBoundary
133-
@Specialization
134-
protected Object receiveTimeout(RubyQueue self, double duration) {
125+
@Specialization(guards = "nonBlocking")
126+
protected Object popNonBlock(RubyQueue self, boolean nonBlocking, Nil timeoutMilliseconds,
127+
@Exclusive @Cached InlinedBranchProfile errorProfile) {
135128
final UnsizedQueue queue = self.queue;
136129

137-
final long durationInMillis = (long) (duration * 1000.0);
138-
final long start = System.currentTimeMillis();
139-
140-
return getContext().getThreadManager().runUntilResult(this, () -> {
141-
long now = System.currentTimeMillis();
142-
long waited = now - start;
143-
if (waited >= durationInMillis) {
144-
// Try again to make sure we at least tried once
145-
final Object result = queue.poll();
146-
return translateResult(result);
147-
}
148-
149-
final Object result = queue.poll(durationInMillis);
150-
return translateResult(result);
151-
});
152-
}
130+
final Object value = queue.poll();
153131

154-
private Object translateResult(Object result) {
155-
if (result == null) {
156-
return false;
157-
} else if (result == UnsizedQueue.CLOSED) {
158-
return nil;
132+
if (value == null) {
133+
errorProfile.enter(this);
134+
throw new RaiseException(getContext(), coreExceptions().threadError("queue empty", this));
159135
} else {
160-
return result;
136+
return value;
161137
}
162138
}
163139

0 commit comments

Comments
 (0)