Skip to content

Commit 843615a

Browse files
committed
[GR-25903] Ensure SafepointManager#poll() is not called from the driving thread.
PullRequest: truffleruby/1940
2 parents 3345818 + 1f5fe48 commit 843615a

File tree

3 files changed

+41
-35
lines changed

3 files changed

+41
-35
lines changed

lib/truffle/truffle/cext.rb

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1606,16 +1606,9 @@ def rb_thread_call_with_gvl(function, data)
16061606
end
16071607

16081608
def rb_thread_call_without_gvl(function, data1, unblock, data2)
1609-
if unblock
1610-
unblocker = -> {
1611-
Truffle::Interop.execute_without_conversion(unblock, data2)
1612-
}
1613-
end
1614-
1615-
Primitive.call_without_c_mutex(
1616-
-> { Thread.current.unblock(
1617-
unblocker,
1618-
-> { function.call(data1) }) }, [])
1609+
Primitive.call_without_c_mutex(-> {
1610+
Primitive.call_with_unblocking_function(Thread.current, function, data1, unblock, data2)
1611+
}, [])
16191612
end
16201613

16211614
def rb_iterate(iteration, iterated_object, callback, callback_arg)

src/main/java/org/truffleruby/core/thread/ThreadNodes.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@
7676
import org.truffleruby.core.symbol.RubySymbol;
7777
import org.truffleruby.core.thread.ThreadManager.UnblockingAction;
7878
import org.truffleruby.core.thread.ThreadManager.UnblockingActionHolder;
79+
import org.truffleruby.interop.InteropNodes;
80+
import org.truffleruby.interop.TranslateInteropExceptionNode;
7981
import org.truffleruby.language.Nil;
8082
import org.truffleruby.language.NotProvided;
8183
import org.truffleruby.language.SafepointAction;
@@ -88,11 +90,14 @@
8890
import org.truffleruby.language.objects.shared.SharedObjects;
8991
import org.truffleruby.language.yield.YieldNode;
9092

93+
import com.oracle.truffle.api.CompilerDirectives;
9194
import com.oracle.truffle.api.CompilerDirectives.CompilationFinal;
9295
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
9396
import com.oracle.truffle.api.dsl.Cached;
9497
import com.oracle.truffle.api.dsl.ImportStatic;
9598
import com.oracle.truffle.api.dsl.Specialization;
99+
import com.oracle.truffle.api.interop.InteropException;
100+
import com.oracle.truffle.api.interop.InteropLibrary;
96101
import com.oracle.truffle.api.library.CachedLibrary;
97102
import com.oracle.truffle.api.nodes.Node;
98103
import com.oracle.truffle.api.object.Shape;
@@ -503,46 +508,49 @@ protected RubyThread wakeup(RubyThread rubyThread) {
503508
}
504509

505510
@NonStandard
506-
@CoreMethod(names = "unblock", required = 2)
507-
public abstract static class UnblockNode extends YieldingCoreMethodNode {
511+
@Primitive(name = "call_with_unblocking_function")
512+
public abstract static class CallWithUnblockingFunctionNode extends PrimitiveArrayArgumentsNode {
508513

509-
@Specialization
510-
protected Object unblock(RubyThread thread, Object unblocker, RubyProc runner,
511-
@Cached("createCountingProfile()") LoopConditionProfile loopProfile) {
514+
@Specialization(limit = "getCacheLimit()")
515+
protected Object call(RubyThread thread, Object function, Object arg, Object unblocker, Object unblockerArg,
516+
@CachedLibrary("function") InteropLibrary receivers,
517+
@Cached TranslateInteropExceptionNode translateInteropExceptionNode) {
512518
final ThreadManager threadManager = getContext().getThreadManager();
513519
final UnblockingAction unblockingAction;
514520
if (unblocker == nil) {
515521
unblockingAction = threadManager.getNativeCallUnblockingAction();
516522
} else {
517-
unblockingAction = makeUnblockingAction((RubyProc) unblocker);
523+
unblockingAction = makeUnblockingAction(unblocker, unblockerArg);
518524
}
519-
final UnblockingActionHolder actionHolder = threadManager.getActionHolder(Thread.currentThread());
520525

526+
final UnblockingActionHolder actionHolder = threadManager.getActionHolder(Thread.currentThread());
521527
final UnblockingAction oldAction = actionHolder.changeTo(unblockingAction);
528+
final ThreadStatus status = thread.status;
529+
thread.status = ThreadStatus.SLEEP;
522530
try {
523-
Object result;
524-
do {
525-
final ThreadStatus status = thread.status;
526-
thread.status = ThreadStatus.SLEEP;
527-
528-
try {
529-
result = yield(runner);
530-
} finally {
531-
thread.status = status;
532-
}
533-
} while (loopProfile.profile(result == null));
534-
535-
return result;
531+
return InteropNodes.execute(function, new Object[]{ arg }, receivers, translateInteropExceptionNode);
536532
} finally {
533+
thread.status = status;
537534
actionHolder.restore(oldAction);
535+
538536
}
539537
}
540538

541539
@TruffleBoundary
542-
private UnblockingAction makeUnblockingAction(RubyProc unblocker) {
543-
return () -> yield(unblocker);
540+
private UnblockingAction makeUnblockingAction(Object function, Object argument) {
541+
assert InteropLibrary.getUncached().isExecutable(function);
542+
return () -> {
543+
try {
544+
InteropLibrary.getUncached().execute(function, argument);
545+
} catch (InteropException e) {
546+
throw CompilerDirectives.shouldNotReachHere(e);
547+
}
548+
};
544549
}
545550

551+
protected int getCacheLimit() {
552+
return getContext().getOptions().DISPATCH_CACHE;
553+
}
546554
}
547555

548556
@CoreMethod(names = "list", onSingleton = true)

src/main/java/org/truffleruby/language/SafepointManager.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ private void poll(Node currentNode, boolean fromBlockingCall) {
108108

109109
@TruffleBoundary
110110
private void assumptionInvalidated(Node currentNode, boolean fromBlockingCall) {
111+
if (lock.isHeldByCurrentThread()) {
112+
throw CompilerDirectives.shouldNotReachHere("poll() should not be called by the driving thread");
113+
}
114+
111115
final RubyThread thread = context.getThreadManager().getCurrentThread();
112116
final InterruptMode interruptMode = thread.interruptMode;
113117

@@ -129,6 +133,8 @@ private void assumptionInvalidated(Node currentNode, boolean fromBlockingCall) {
129133

130134
@TruffleBoundary
131135
private SafepointAction step(Node currentNode, boolean isDrivingThread) {
136+
assert isDrivingThread == lock.isHeldByCurrentThread();
137+
132138
final RubyThread thread = context.getThreadManager().getCurrentThread();
133139

134140
// Wait for other threads to reach their safepoint
@@ -314,9 +320,8 @@ private void pauseAllThreadsAndExecute(Node currentNode, SafepointAction action,
314320
this.action = action;
315321
this.deferred = deferred;
316322

317-
/* this is a potential cause for race conditions, but we need to invalidate first so the interrupted threads see
318-
* the invalidation in poll() in their catch(InterruptedException) clause and wait on the barrier instead of
319-
* retrying their blocking action. */
323+
/* We need to invalidate first so the interrupted threads see the invalidation in poll() in their
324+
* catch(InterruptedException) clause and wait on the Phaser instead of retrying their blocking action. */
320325
assumption.invalidate();
321326
interruptOtherThreads();
322327

0 commit comments

Comments
 (0)