Skip to content

Commit 4dfcabd

Browse files
itaratoandrykonchin
authored andcommitted
Add timeout argument to Thread::SizedQueue#push
1 parent b6c229a commit 4dfcabd

File tree

10 files changed

+141
-19
lines changed

10 files changed

+141
-19
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Compatibility:
2929
* Add `Module#refinements` (#3039, @itarato).
3030
* Add `Refinement#refined_class` (#3039, @itarato).
3131
* Add `rb_hash_new_capa` function (#3039, @itarato).
32+
* Add `timeout` argument to `Thread::SizedQueue#push` (#3039, @itarato).
3233

3334
Performance:
3435

lib/truffle/thread.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,15 @@ def pop(non_block = false, timeout: nil)
5050
end
5151
alias_method :shift, :pop
5252
alias_method :deq, :pop
53+
54+
def push(value, non_block = false, timeout: nil)
55+
Primitive.sized_queue_push(
56+
self,
57+
value,
58+
Primitive.as_boolean(non_block),
59+
Truffle::QueueOperations.validate_and_prepare_timeout_in_milliseconds(non_block, timeout))
60+
end
61+
alias_method :<<, :push
62+
alias_method :enq, :push
5363
end
5464
end

spec/ruby/core/sizedqueue/append_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/enque'
33
require_relative '../../shared/sizedqueue/enque'
4+
require_relative '../../shared/types/rb_num2dbl_fails'
45

56
describe "SizedQueue#<<" do
67
it_behaves_like :queue_enq, :<<, -> { SizedQueue.new(10) }
@@ -9,3 +10,9 @@
910
describe "SizedQueue#<<" do
1011
it_behaves_like :sizedqueue_enq, :<<, -> n { SizedQueue.new(n) }
1112
end
13+
14+
describe "SizedQueue operations with timeout" do
15+
ruby_version_is "3.2" do
16+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(1); q.send(:<<, 1, timeout: v) }
17+
end
18+
end

spec/ruby/core/sizedqueue/enq_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/enque'
33
require_relative '../../shared/sizedqueue/enque'
4+
require_relative '../../shared/types/rb_num2dbl_fails'
45

56
describe "SizedQueue#enq" do
67
it_behaves_like :queue_enq, :enq, -> { SizedQueue.new(10) }
@@ -9,3 +10,9 @@
910
describe "SizedQueue#enq" do
1011
it_behaves_like :sizedqueue_enq, :enq, -> n { SizedQueue.new(n) }
1112
end
13+
14+
describe "SizedQueue operations with timeout" do
15+
ruby_version_is "3.2" do
16+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(1); q.enq(1, timeout: v) }
17+
end
18+
end

spec/ruby/core/sizedqueue/push_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require_relative '../../spec_helper'
22
require_relative '../../shared/queue/enque'
33
require_relative '../../shared/sizedqueue/enque'
4+
require_relative '../../shared/types/rb_num2dbl_fails'
45

56
describe "SizedQueue#push" do
67
it_behaves_like :queue_enq, :push, -> { SizedQueue.new(10) }
@@ -9,3 +10,9 @@
910
describe "SizedQueue#push" do
1011
it_behaves_like :sizedqueue_enq, :push, -> n { SizedQueue.new(n) }
1112
end
13+
14+
describe "SizedQueue operations with timeout" do
15+
ruby_version_is "3.2" do
16+
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = SizedQueue.new(1); q.push(1, timeout: v) }
17+
end
18+
end

spec/ruby/shared/sizedqueue/enque.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,25 @@
108108
"can't set a timeout if non_block is enabled",
109109
)
110110
end
111+
112+
it "raise ClosedQueueError when closed before enqueued" do
113+
q = @object.call(1)
114+
q.close
115+
-> { q.send(@method, 2, timeout: 1) }.should raise_error(ClosedQueueError, "queue closed")
116+
end
117+
118+
it "raise ClosedQueueError when getting closed during wait" do
119+
q = @object.call(1)
120+
q.push(1)
121+
122+
t = Thread.new {
123+
-> { q.push(1, timeout: 0.1) }.should raise_error(ClosedQueueError, "queue closed")
124+
}
125+
126+
sleep(0.05)
127+
q.close
128+
t.join
129+
end
111130
end
112131
end
113132
end

spec/truffleruby.next-specs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,6 @@ spec/ruby/core/module/refinements_spec.rb
3434
spec/ruby/core/refinement/refined_class_spec.rb
3535
spec/ruby/core/module/used_refinements_spec.rb
3636
spec/ruby/optional/capi/hash_spec.rb
37+
spec/ruby/core/sizedqueue/append_spec.rb
38+
spec/ruby/core/sizedqueue/enq_spec.rb
39+
spec/ruby/core/sizedqueue/push_spec.rb

src/main/java/org/truffleruby/core/queue/SizedQueue.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,37 @@ public boolean put(Object item) throws InterruptedException {
127127
}
128128
}
129129

130+
@TruffleBoundary
131+
public Object put(Object item, long timeoutMilliseconds) throws InterruptedException {
132+
lock.lock();
133+
try {
134+
if (closed) {
135+
return CLOSED;
136+
}
137+
138+
final long deadline = System.currentTimeMillis() + timeoutMilliseconds;
139+
140+
while (size == capacity) {
141+
final long currentTimeout = deadline - System.currentTimeMillis();
142+
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canAdd, currentTimeout,
143+
TimeUnit.MILLISECONDS);
144+
145+
if (!signalled) {
146+
return false;
147+
}
148+
149+
if (closed) {
150+
return CLOSED;
151+
}
152+
}
153+
154+
doAdd(item);
155+
return true;
156+
} finally {
157+
lock.unlock();
158+
}
159+
}
160+
130161
private void doAdd(Object item) {
131162
items[addEnd] = item;
132163
addEnd++;
@@ -144,17 +175,20 @@ public Object poll(long timeoutMilliseconds) throws InterruptedException {
144175
lock.lock();
145176

146177
try {
147-
if (size == 0) {
148-
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canTake, timeoutMilliseconds,
178+
final long deadline = System.currentTimeMillis() + timeoutMilliseconds;
179+
180+
while (size == 0) {
181+
final long currentTimeout = deadline - System.currentTimeMillis();
182+
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canTake, currentTimeout,
149183
TimeUnit.MILLISECONDS);
150184

151-
if (!signalled) { // timed out
185+
if (!signalled) { // Timed out.
152186
return null;
153187
}
188+
}
154189

155-
if (closed) {
156-
return CLOSED;
157-
}
190+
if (closed) {
191+
return CLOSED;
158192
}
159193

160194
return doTake();

src/main/java/org/truffleruby/core/queue/SizedQueueNodes.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,27 +100,29 @@ protected int max(RubySizedQueue self) {
100100

101101
}
102102

103-
@CoreMethod(names = { "push", "<<", "enq" }, required = 1, optional = 1)
104-
public abstract static class SizedQueuePushNode extends CoreMethodArrayArgumentsNode {
103+
@Primitive(name = "sized_queue_push")
104+
public abstract static class SizedQueuePushNode extends PrimitiveArrayArgumentsNode {
105105

106106
@Specialization
107-
protected RubySizedQueue doPush(RubySizedQueue self, final Object value, Object maybeNonBlocking,
107+
protected Object doPush(
108+
RubySizedQueue self, final Object value, Object maybeNonBlocking, Object timeoutMilliseconds,
108109
@Cached BooleanCastWithDefaultNode booleanCastWithDefaultNode,
109110
@Cached PushNode pushNode) {
110111
final boolean nonBlocking = booleanCastWithDefaultNode.execute(this, maybeNonBlocking, false);
111-
return pushNode.execute(this, self, value, nonBlocking);
112+
return pushNode.execute(this, self, value, nonBlocking, timeoutMilliseconds);
112113
}
113114
}
114115

115116
@GenerateInline
116117
@GenerateCached(false)
117118
public abstract static class PushNode extends RubyBaseNode {
118119

119-
public abstract RubySizedQueue execute(Node node, RubySizedQueue self, final Object value, boolean nonBlocking);
120+
public abstract Object execute(Node node, RubySizedQueue self, final Object value, boolean nonBlocking,
121+
Object timeoutMilliseconds);
120122

121123
@Specialization(guards = "!nonBlocking")
122124
protected static RubySizedQueue pushBlocking(
123-
Node node, RubySizedQueue self, final Object value, boolean nonBlocking,
125+
Node node, RubySizedQueue self, final Object value, boolean nonBlocking, Nil timeoutMilliseconds,
124126
@Cached @Shared PropagateSharingNode propagateSharingNode) {
125127
final SizedQueue queue = self.queue;
126128

@@ -141,9 +143,38 @@ private static void doPushBlocking(Node node, final Object value, final SizedQue
141143
});
142144
}
143145

146+
@Specialization(guards = "!nonBlocking")
147+
protected static Object pushBlocking(
148+
Node node, RubySizedQueue self, final Object value, boolean nonBlocking, long timeoutMilliseconds,
149+
@Cached @Shared PropagateSharingNode propagateSharingNode) {
150+
final SizedQueue queue = self.queue;
151+
propagateSharingNode.execute(node, self, value);
152+
final long deadline = System.currentTimeMillis() + timeoutMilliseconds;
153+
154+
var success = getContext(node).getThreadManager().runUntilResult(node, () -> {
155+
final long currentTimeout = deadline - System.currentTimeMillis();
156+
final Object result;
157+
158+
if (currentTimeout > 0) {
159+
result = queue.put(value, currentTimeout);
160+
} else {
161+
result = queue.put(value, 0);
162+
}
163+
164+
if (result == SizedQueue.CLOSED) {
165+
throw new RaiseException(getContext(node), coreExceptions(node).closedQueueError(node));
166+
}
167+
return result;
168+
});
169+
if ((boolean) success) {
170+
return self;
171+
}
172+
return nil;
173+
}
174+
144175
@Specialization(guards = "nonBlocking")
145176
protected static RubySizedQueue pushNonBlock(
146-
Node node, RubySizedQueue self, final Object value, boolean nonBlocking,
177+
Node node, RubySizedQueue self, final Object value, boolean nonBlocking, Nil timeoutMilliseconds,
147178
@Cached @Shared PropagateSharingNode propagateSharingNode,
148179
@Cached InlinedBranchProfile errorProfile) {
149180
final SizedQueue queue = self.queue;

src/main/java/org/truffleruby/core/queue/UnsizedQueue.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,17 +110,20 @@ public Object poll(long timeoutMilliseconds) throws InterruptedException {
110110
lock.lock();
111111

112112
try {
113-
if (takeEnd == null) {
114-
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canTake, timeoutMilliseconds,
113+
final long deadline = System.currentTimeMillis() + timeoutMilliseconds;
114+
115+
while (takeEnd == null) {
116+
final long currentTimeout = deadline - System.currentTimeMillis();
117+
final boolean signalled = ConcurrentOperations.awaitAndCheckInterrupt(canTake, currentTimeout,
115118
TimeUnit.MILLISECONDS);
116119

117-
if (!signalled) { // timed out
120+
if (!signalled) { // Timed out.
118121
return null;
119122
}
123+
}
120124

121-
if (closed) {
122-
return CLOSED;
123-
}
125+
if (closed) {
126+
return CLOSED;
124127
}
125128

126129
return doTake();

0 commit comments

Comments
 (0)