Skip to content

Commit b7090ac

Browse files
committed
[GR-18163] Fix Thread.handle_interrupt and implement Thread[.#]pending_interrupt? (#2219)
PullRequest: truffleruby/2355
2 parents 3fb6552 + 52f48ca commit b7090ac

File tree

16 files changed

+364
-96
lines changed

16 files changed

+364
-96
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Bug fixes:
99
* `Range#to_a` wasn't working for `long` ranges (#2198, @tomstuart and @LillianZ).
1010
* Show the interleaved host and guest stacktrace for host exceptions (#2226).
1111
* Fix the label of the first location reported by `Thread#backtrace_locations` (#2229).
12+
* Fix `Thread.handle_interrupt` to defer non-pure interrupts until the end of the `handle_interrupt` block (#2219).
1213

1314
Compatibility:
1415

@@ -41,6 +42,7 @@ Compatibility:
4142
* Support buffer argument for `UDPSocket#recvfrom_nonblock` (#2209, @HoneyryderChuck).
4243
* Fixed `Integer#digits` implementation to handle more bases (#2224, #2225).
4344
* Support the `inherit` parameter for `Module#{private, protected, public}_method_defined?`.
45+
* Implement `Thread.pending_interrupt?` and `Thread#pending_interrupt?` (#2219).
4446

4547
Performance:
4648

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
require_relative '../../spec_helper'
2+
3+
describe "Thread.handle_interrupt" do
4+
def make_handle_interrupt_thread(interrupt_config, blocking = true)
5+
interrupt_class = Class.new(RuntimeError)
6+
7+
ScratchPad.record []
8+
9+
in_handle_interrupt = Queue.new
10+
can_continue = Queue.new
11+
12+
thread = Thread.new do
13+
begin
14+
Thread.handle_interrupt(interrupt_config) do
15+
begin
16+
in_handle_interrupt << true
17+
if blocking
18+
can_continue.pop
19+
else
20+
begin
21+
can_continue.pop(true)
22+
rescue ThreadError
23+
Thread.pass
24+
retry
25+
end
26+
end
27+
rescue interrupt_class
28+
ScratchPad << :interrupted
29+
end
30+
end
31+
rescue interrupt_class
32+
ScratchPad << :deferred
33+
end
34+
end
35+
36+
in_handle_interrupt.pop
37+
thread.raise interrupt_class, "interrupt"
38+
can_continue << true
39+
thread.join
40+
41+
ScratchPad.recorded
42+
end
43+
44+
before :each do
45+
Thread.pending_interrupt?.should == false # sanity check
46+
end
47+
48+
it "with :never defers interrupts until exiting the handle_interrupt block" do
49+
make_handle_interrupt_thread(RuntimeError => :never).should == [:deferred]
50+
end
51+
52+
it "with :on_blocking defers interrupts until the next blocking call" do
53+
make_handle_interrupt_thread(RuntimeError => :on_blocking).should == [:interrupted]
54+
make_handle_interrupt_thread({ RuntimeError => :on_blocking }, false).should == [:deferred]
55+
end
56+
57+
it "with :immediate handles interrupts immediately" do
58+
make_handle_interrupt_thread(RuntimeError => :immediate).should == [:interrupted]
59+
end
60+
61+
it "with :immediate immediately runs pending interrupts, before the block" do
62+
Thread.handle_interrupt(RuntimeError => :never) do
63+
current = Thread.current
64+
Thread.new {
65+
current.raise "interrupt immediate"
66+
}.join
67+
68+
Thread.pending_interrupt?.should == true
69+
-> {
70+
Thread.handle_interrupt(RuntimeError => :immediate) {
71+
flunk "not reached"
72+
}
73+
}.should raise_error(RuntimeError, "interrupt immediate")
74+
Thread.pending_interrupt?.should == false
75+
end
76+
end
77+
78+
it "also works with suspended Fibers and does not duplicate interrupts" do
79+
fiber = Fiber.new { Fiber.yield }
80+
fiber.resume
81+
82+
Thread.handle_interrupt(RuntimeError => :never) do
83+
current = Thread.current
84+
Thread.new {
85+
current.raise "interrupt with fibers"
86+
}.join
87+
88+
Thread.pending_interrupt?.should == true
89+
-> {
90+
Thread.handle_interrupt(RuntimeError => :immediate) {
91+
flunk "not reached"
92+
}
93+
}.should raise_error(RuntimeError, "interrupt with fibers")
94+
Thread.pending_interrupt?.should == false
95+
end
96+
97+
fiber.resume
98+
end
99+
100+
it "runs pending interrupts at the end of the block, even if there was an exception raised in the block" do
101+
executed = false
102+
-> {
103+
Thread.handle_interrupt(RuntimeError => :never) do
104+
current = Thread.current
105+
Thread.new {
106+
current.raise "interrupt exception"
107+
}.join
108+
109+
Thread.pending_interrupt?.should == true
110+
executed = true
111+
raise "regular exception"
112+
end
113+
}.should raise_error(RuntimeError, "interrupt exception")
114+
executed.should == true
115+
end
116+
117+
it "supports multiple pairs in the Hash" do
118+
make_handle_interrupt_thread(ArgumentError => :never, RuntimeError => :never).should == [:deferred]
119+
end
120+
end
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
require_relative '../../spec_helper'
2+
3+
describe "Thread.pending_interrupt?" do
4+
it "returns false if there are no pending interrupts, e.g., outside any Thread.handle_interrupt block" do
5+
Thread.pending_interrupt?.should == false
6+
end
7+
8+
it "returns true if there are pending interrupts, e.g., Thread#raise inside Thread.handle_interrupt" do
9+
executed = false
10+
-> {
11+
Thread.handle_interrupt(RuntimeError => :never) do
12+
Thread.pending_interrupt?.should == false
13+
14+
current = Thread.current
15+
Thread.new {
16+
current.raise "interrupt"
17+
}.join
18+
19+
Thread.pending_interrupt?.should == true
20+
executed = true
21+
end
22+
}.should raise_error(RuntimeError, "interrupt")
23+
executed.should == true
24+
Thread.pending_interrupt?.should == false
25+
end
26+
end
27+
28+
describe "Thread#pending_interrupt?" do
29+
it "returns whether the given threads has pending interrupts" do
30+
Thread.current.pending_interrupt?.should == false
31+
end
32+
end

spec/tags/truffle/methods_tags.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ fails:Public methods on Fiber should include to_s
5757
fails:Public methods on Mutex should not include marshal_dump
5858
fails:Public methods on Thread should not include freeze
5959
fails:Public methods on Thread should include add_trace_func
60-
fails:Public methods on Thread should include pending_interrupt?
6160
fails:Public methods on Thread should include set_trace_func
6261
fails:Public methods on Proc should not include block
6362
fails:Public methods on Proc should not include block=

src/main/java/org/truffleruby/core/VMPrimitiveNodes.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.truffleruby.core.cast.NameToJavaStringNode;
5353
import org.truffleruby.core.cast.ToRubyIntegerNode;
5454
import org.truffleruby.core.exception.RubyException;
55-
import org.truffleruby.core.fiber.FiberManager;
5655
import org.truffleruby.core.klass.RubyClass;
5756
import org.truffleruby.core.method.RubyMethod;
5857
import org.truffleruby.core.module.RubyModule;
@@ -64,8 +63,8 @@
6463
import org.truffleruby.core.string.RubyString;
6564
import org.truffleruby.core.string.StringNodes.MakeStringNode;
6665
import org.truffleruby.core.thread.RubyThread;
67-
import org.truffleruby.core.thread.ThreadManager;
6866
import org.truffleruby.language.RubyDynamicObject;
67+
import org.truffleruby.language.SafepointPredicate;
6968
import org.truffleruby.language.backtrace.Backtrace;
7069
import org.truffleruby.language.backtrace.BacktraceFormatter;
7170
import org.truffleruby.language.control.ExitException;
@@ -270,8 +269,6 @@ protected boolean watchSignalProc(Object signalString, RubyProc action,
270269
}
271270

272271
final RubyThread rootThread = context.getThreadManager().getRootThread();
273-
final FiberManager fiberManager = rootThread.fiberManager;
274-
final ThreadManager threadManager = context.getThreadManager();
275272

276273
// Workaround: we need to register with Truffle (which means going multithreaded),
277274
// so that NFI can get its context to call pthread_kill() (GR-7405).
@@ -296,14 +293,8 @@ protected boolean watchSignalProc(Object signalString, RubyProc action,
296293
try {
297294
context.getSafepointManager().pauseAllThreadsAndExecuteFromNonRubyThread(
298295
"Handling of signal " + signal,
299-
true,
300-
(rubyThread, currentNode) -> {
301-
if (rubyThread == rootThread &&
302-
threadManager.getRubyFiberFromCurrentJavaThread() == fiberManager
303-
.getCurrentFiber()) {
304-
ProcOperations.rootCall(action);
305-
}
306-
});
296+
SafepointPredicate.currentFiberOfThread(context, rootThread),
297+
(rubyThread, currentNode) -> ProcOperations.rootCall(action));
307298
} finally {
308299
truffleContext.leave(this, prev);
309300
}

src/main/java/org/truffleruby/core/thread/RubyThread.java

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@
1111

1212
import java.util.ArrayList;
1313
import java.util.List;
14+
import java.util.Queue;
1415
import java.util.Set;
1516
import java.util.concurrent.CountDownLatch;
17+
import java.util.concurrent.LinkedBlockingQueue;
1618
import java.util.concurrent.atomic.AtomicBoolean;
1719
import java.util.concurrent.locks.Lock;
1820

21+
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
1922
import org.truffleruby.RubyContext;
2023
import org.truffleruby.RubyLanguage;
2124
import org.truffleruby.core.InterruptMode;
@@ -29,6 +32,7 @@
2932
import org.truffleruby.core.tracepoint.TracePointState;
3033
import org.truffleruby.language.Nil;
3134
import org.truffleruby.language.RubyDynamicObject;
35+
import org.truffleruby.language.SafepointAction;
3236
import org.truffleruby.language.objects.ObjectGraph;
3337
import org.truffleruby.language.objects.ObjectGraphNode;
3438
import org.truffleruby.language.threadlocal.ThreadLocalGlobals;
@@ -37,28 +41,31 @@
3741

3842
public class RubyThread extends RubyDynamicObject implements ObjectGraphNode {
3943

40-
public final ThreadLocalGlobals threadLocalGlobals;
41-
public volatile InterruptMode interruptMode;
42-
public volatile ThreadStatus status;
43-
public final List<Lock> ownedLocks;
44-
public FiberManager fiberManager;
45-
CountDownLatch finishedLatch;
44+
// Fields initialized here are initialized just after the super() call, and before the rest of the constructor
45+
public final ThreadLocalGlobals threadLocalGlobals = new ThreadLocalGlobals();
46+
public volatile InterruptMode interruptMode = InterruptMode.IMMEDIATE;
47+
public volatile ThreadStatus status = ThreadStatus.RUN;
48+
public final List<Lock> ownedLocks = new ArrayList<>();
49+
public final FiberManager fiberManager;
50+
CountDownLatch finishedLatch = new CountDownLatch(1);
4651
final RubyHash threadLocalVariables;
4752
final RubyHash recursiveObjects;
4853
final RubyHash recursiveObjectsSingle;
4954
final RubyRandomizer randomizer;
50-
public final TracePointState tracePointState;
55+
public final TracePointState tracePointState = new TracePointState();
5156
boolean reportOnException;
5257
boolean abortOnException;
53-
public volatile Thread thread;
54-
volatile RubyException exception;
55-
volatile Object value;
56-
public final AtomicBoolean wakeUp;
57-
volatile int priority;
58-
public ThreadLocalBuffer ioBuffer;
58+
public volatile Thread thread = null;
59+
volatile RubyException exception = null;
60+
volatile Object value = null;
61+
public final AtomicBoolean wakeUp = new AtomicBoolean(false);
62+
volatile int priority = Thread.NORM_PRIORITY;
63+
public ThreadLocalBuffer ioBuffer = ThreadLocalBuffer.NULL_BUFFER;
64+
// Needs to be a thread-safe queue because multiple Fibers of the same Thread might enqueue concurrently
65+
public final Queue<SafepointAction> pendingSafepointActions = newLinkedBlockingQueue();
5966
Object threadGroup;
6067
String sourceLocation;
61-
Object name;
68+
Object name = Nil.INSTANCE;
6269

6370
public RubyThread(
6471
RubyClass rubyClass,
@@ -70,31 +77,16 @@ public RubyThread(
7077
Object threadGroup,
7178
String sourceLocation) {
7279
super(rubyClass, shape);
73-
this.threadLocalGlobals = new ThreadLocalGlobals();
74-
this.interruptMode = InterruptMode.IMMEDIATE;
75-
this.status = ThreadStatus.RUN;
76-
this.ownedLocks = new ArrayList<>();
77-
this.finishedLatch = new CountDownLatch(1);
7880
this.threadLocalVariables = HashOperations.newEmptyHash(context, language);
7981
this.recursiveObjects = HashOperations.newEmptyHash(context, language);
8082
this.recursiveObjectsSingle = HashOperations.newEmptyHash(context, language);
8183
this.recursiveObjectsSingle.compareByIdentity = true;
82-
this.randomizer = RandomizerNodes.newRandomizer(
83-
context,
84-
language,
85-
false); // This random instance is only for this thread and thus does not need to be thread-safe
86-
this.tracePointState = new TracePointState();
84+
// This random instance is only for this thread and thus does not need to be thread-safe
85+
this.randomizer = RandomizerNodes.newRandomizer(context, language, false);
8786
this.reportOnException = reportOnException;
8887
this.abortOnException = abortOnException;
89-
this.thread = null;
90-
this.exception = null;
91-
this.value = null;
92-
this.wakeUp = new AtomicBoolean(false);
93-
this.priority = Thread.NORM_PRIORITY;
94-
this.ioBuffer = ThreadLocalBuffer.NULL_BUFFER;
9588
this.threadGroup = threadGroup;
9689
this.sourceLocation = sourceLocation;
97-
this.name = Nil.INSTANCE;
9890
// Initialized last as it captures `this`
9991
this.fiberManager = new FiberManager(language, context, this);
10092
}
@@ -105,4 +97,9 @@ public void getAdjacentObjects(Set<Object> reachable) {
10597
ObjectGraph.addProperty(reachable, name);
10698
}
10799

100+
@TruffleBoundary
101+
private static LinkedBlockingQueue<SafepointAction> newLinkedBlockingQueue() {
102+
return new LinkedBlockingQueue<>();
103+
}
104+
108105
}

src/main/java/org/truffleruby/core/thread/ThreadManager.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -667,17 +667,15 @@ private void doKillOtherThreads() {
667667
while (true) {
668668
try {
669669
final String reason = "kill other threads for shutdown";
670-
context.getSafepointManager().pauseAllThreadsAndExecute(reason, null, false, (thread, currentNode) -> {
671-
if (Thread.currentThread() != initiatingJavaThread) {
672-
final FiberManager fiberManager = thread.fiberManager;
673-
final RubyFiber fiber = getRubyFiberFromCurrentJavaThread();
674-
675-
if (fiberManager.getCurrentFiber() == fiber) {
670+
context.getSafepointManager().pauseAllThreadsAndExecute(
671+
reason,
672+
null,
673+
thread -> Thread.currentThread() != initiatingJavaThread &&
674+
getRubyFiberFromCurrentJavaThread() == thread.fiberManager.getCurrentFiber(),
675+
(thread, currentNode) -> {
676676
thread.status = ThreadStatus.ABORTING;
677677
throw new KillException();
678-
}
679-
}
680-
});
678+
});
681679
break; // Successfully executed the safepoint and sent the exceptions.
682680
} catch (RaiseException e) {
683681
final RubyException rubyException = e.getException();

0 commit comments

Comments
 (0)