Skip to content

Commit 460bb6a

Browse files
committed
[GR-6190] Do not trigger initializeMultiThreading() when creating Fibers
* Instead, leave the TruffleContext explicitly before switching to another Fiber. * Leave the SafepointManager before leaving the TruffleContext, as we cannot execute safepoint actions without being in the context. * Print internal errors escaping fiberMain() and the fiber pool.
1 parent 5726938 commit 460bb6a

File tree

6 files changed

+164
-48
lines changed

6 files changed

+164
-48
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ New features:
55
* Access to local variables of the interactive Binding via language bindings is now supported: `context.getBindings("ruby").putMember("my_var", 42);` (#2030).
66
* `VALUE`s in C extensions now expose the Ruby object when viewed in the debugger, as long as they have not been converted to native values.
77
* Signal handlers can now be run without triggering multi-threading.
8+
* Fibers no longer trigger Truffle multi-threading.
89

910
Bug fixes:
1011

src/main/java/org/truffleruby/core/fiber/FiberManager.java

Lines changed: 79 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.concurrent.ConcurrentHashMap;
1515
import java.util.concurrent.CountDownLatch;
1616

17+
import com.oracle.truffle.api.TruffleContext;
1718
import org.truffleruby.RubyContext;
1819
import org.truffleruby.RubyLanguage;
1920
import org.truffleruby.core.array.ArrayHelpers;
@@ -103,7 +104,7 @@ public RubyFiber createFiber(RubyLanguage language, RubyContext context, RubyThr
103104
public void initialize(RubyFiber fiber, RubyProc block, Node currentNode) {
104105
ThreadManager.FIBER_BEING_SPAWNED.set(fiber);
105106
try {
106-
context.getThreadManager().spawnFiber(() -> fiberMain(context, fiber, block, currentNode));
107+
context.getThreadManager().spawnFiber(fiber, () -> fiberMain(context, fiber, block, currentNode));
107108
waitForInitialization(context, fiber, currentNode);
108109
} finally {
109110
ThreadManager.FIBER_BEING_SPAWNED.remove();
@@ -125,7 +126,7 @@ public static void waitForInitialization(RubyContext context, RubyFiber fiber, N
125126
}
126127
}
127128

128-
private static final BranchProfile UNPROFILED = BranchProfile.create();
129+
private static final BranchProfile UNPROFILED = BranchProfile.getUncached();
129130

130131
private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Node currentNode) {
131132
assert fiber != rootFiber : "Root Fibers execute threadMain() and not fiberMain()";
@@ -135,16 +136,25 @@ private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Nod
135136
final String oldName = thread.getName();
136137
thread.setName(NAME_PREFIX + " id=" + thread.getId() + " from " + RubyContext.fileLine(sourceSection));
137138

138-
start(fiber, thread);
139-
try {
139+
start(fiber, thread, false);
140+
141+
final TruffleContext truffleContext = context.getEnv().getContext();
142+
assert !truffleContext.isEntered();
140143

141-
final Object[] args = waitForResume(fiber);
144+
final FiberMessage message = waitMessage(fiber);
145+
final Object prev = truffleContext.enter(currentNode);
146+
context.getSafepointManager().enterThread(); // not done in start() above because the context was not entered
147+
try {
142148
final Object result;
143149
try {
150+
final Object[] args = handleMessage(fiber, message);
144151
result = ProcOperations.rootCall(block, args);
145152
} finally {
146153
// Make sure that other fibers notice we are dead before they gain control back
147154
fiber.alive = false;
155+
// Leave before resume/sendExceptionToParentFiber -> addToMessageQueue() -> parent Fiber starts executing
156+
context.getSafepointManager().leaveThread();
157+
truffleContext.leave(currentNode, prev);
148158
}
149159
resume(fiber, getReturnFiber(fiber, currentNode, UNPROFILED), FiberOperation.YIELD, result);
150160

@@ -164,8 +174,11 @@ private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Nod
164174
fiber,
165175
new RaiseException(context, context.getCoreExceptions().unexpectedReturn(currentNode)),
166176
currentNode);
177+
} catch (Throwable e) {
178+
final RuntimeException exception = ThreadManager.printInternalError(e);
179+
sendExceptionToParentFiber(fiber, exception, currentNode);
167180
} finally {
168-
cleanup(fiber, thread);
181+
cleanup(fiber, thread, false);
169182
thread.setName(oldName);
170183
}
171184
}
@@ -193,16 +206,19 @@ public RubyFiber getReturnFiber(RubyFiber currentFiber, Node currentNode, Branch
193206

194207
@TruffleBoundary
195208
private void addToMessageQueue(RubyFiber fiber, FiberMessage message) {
209+
assert !context.getEnv().getContext().isEntered() : "should have left context when sending message to fiber";
196210
fiber.messageQueue.add(message);
197211
}
198212

199-
/** Send the Java thread that represents this fiber to sleep until it receives a resume or exit message. */
213+
/** Send the Java thread that represents this fiber to sleep until it receives a message. */
200214
@TruffleBoundary
201-
private Object[] waitForResume(RubyFiber fiber) {
202-
final FiberMessage message = context.getThreadManager().runUntilResultKeepStatus(
203-
null,
204-
() -> fiber.messageQueue.take());
215+
private FiberMessage waitMessage(RubyFiber fiber) {
216+
assert !context.getEnv().getContext().isEntered() : "should have left context while waiting fiber message";
217+
return ThreadManager.retryWhileInterrupted(fiber.messageQueue::take);
218+
}
205219

220+
@TruffleBoundary
221+
private Object[] handleMessage(RubyFiber fiber, FiberMessage message) {
206222
setCurrentFiber(fiber);
207223

208224
if (message instanceof FiberShutdownMessage) {
@@ -227,12 +243,31 @@ private void resume(RubyFiber fromFiber, RubyFiber fiber, FiberOperation operati
227243
addToMessageQueue(fiber, new FiberResumeMessage(operation, fromFiber, args));
228244
}
229245

246+
@TruffleBoundary
230247
public Object[] transferControlTo(RubyFiber fromFiber, RubyFiber fiber, FiberOperation operation, Object[] args) {
231-
resume(fromFiber, fiber, operation, args);
232-
return waitForResume(fromFiber);
248+
final TruffleContext truffleContext = context.getEnv().getContext();
249+
250+
final FiberMessage message;
251+
final boolean isRubyManagedThread = context.getThreadManager().isRubyManagedThread(Thread.currentThread());
252+
if (isRubyManagedThread) {
253+
context.getSafepointManager().leaveThread();
254+
}
255+
try {
256+
message = truffleContext.leaveAndEnter(null, () -> {
257+
resume(fromFiber, fiber, operation, args);
258+
return waitMessage(fromFiber);
259+
});
260+
} finally {
261+
if (isRubyManagedThread) {
262+
context.getSafepointManager().enterThread();
263+
}
264+
}
265+
266+
return handleMessage(fromFiber, message);
233267
}
234268

235-
public void start(RubyFiber fiber, Thread javaThread) {
269+
public void start(RubyFiber fiber, Thread javaThread, boolean entered) {
270+
assert entered == context.getEnv().getContext().isEntered();
236271
final ThreadManager threadManager = context.getThreadManager();
237272

238273
if (Thread.currentThread() == javaThread) {
@@ -249,31 +284,34 @@ public void start(RubyFiber fiber, Thread javaThread) {
249284

250285
runningFibers.add(fiber);
251286

252-
if (threadManager.isRubyManagedThread(javaThread)) {
287+
if (threadManager.isRubyManagedThread(javaThread) && Thread.currentThread() == javaThread && entered) {
253288
context.getSafepointManager().enterThread();
254289
}
255290

256291
// fully initialized
257292
fiber.initializedLatch.countDown();
258293
}
259294

260-
public void cleanup(RubyFiber fiber, Thread javaThread) {
295+
public void cleanup(RubyFiber fiber, Thread javaThread, boolean entered) {
296+
assert entered == context.getEnv().getContext().isEntered();
297+
final ThreadManager threadManager = context.getThreadManager();
298+
261299
fiber.alive = false;
262300

263-
if (context.getThreadManager().isRubyManagedThread(javaThread)) {
301+
if (threadManager.isRubyManagedThread(javaThread) && Thread.currentThread() == javaThread && entered) {
264302
context.getSafepointManager().leaveThread();
265303
}
266304

267-
context.getThreadManager().cleanupValuesForJavaThread(javaThread);
305+
threadManager.cleanupValuesForJavaThread(javaThread);
268306

269307
runningFibers.remove(fiber);
270308

271309
fiber.thread = null;
272310

273311
if (Thread.currentThread() == javaThread) {
274-
context.getThreadManager().rubyFiber.remove();
312+
threadManager.rubyFiber.remove();
275313
}
276-
context.getThreadManager().rubyFiberForeignMap.remove(javaThread);
314+
threadManager.rubyFiberForeignMap.remove(javaThread);
277315

278316
fiber.finishedLatch.countDown();
279317
}
@@ -283,13 +321,32 @@ public void killOtherFibers() {
283321
// All Fibers except the current one are in waitForResume(),
284322
// so sending a FiberShutdownMessage is enough to finish them.
285323
// This also avoids the performance cost of a safepoint.
324+
325+
// This method might not be executed on the rootFiber Java Thread but possibly on another Java Thread.
326+
327+
final TruffleContext truffleContext = context.getEnv().getContext();
328+
assert truffleContext.isEntered();
329+
assert context.getThreadManager().isRubyManagedThread(Thread.currentThread());
330+
331+
context.getSafepointManager().leaveThread();
332+
try {
333+
truffleContext.leaveAndEnter(null, () -> {
334+
doKillOtherFibers();
335+
return null;
336+
});
337+
} finally {
338+
context.getSafepointManager().enterThread();
339+
}
340+
}
341+
342+
private void doKillOtherFibers() {
286343
for (RubyFiber fiber : runningFibers) {
287344
if (fiber != rootFiber) {
288345
addToMessageQueue(fiber, new FiberShutdownMessage());
289346

290347
// Wait for the Fiber to finish so we only run one Fiber at a time
291348
final CountDownLatch finishedLatch = fiber.finishedLatch;
292-
context.getThreadManager().runUntilResultKeepStatus(null, () -> {
349+
ThreadManager.retryWhileInterrupted(() -> {
293350
finishedLatch.await();
294351
return BlockingAction.SUCCESS;
295352
});
@@ -300,7 +357,7 @@ public void killOtherFibers() {
300357
@TruffleBoundary
301358
public void shutdown(Thread javaThread) {
302359
killOtherFibers();
303-
cleanup(rootFiber, javaThread);
360+
cleanup(rootFiber, javaThread, true);
304361
}
305362

306363
public String getFiberDebugInfo() {

src/main/java/org/truffleruby/core/thread/ThreadManager.java

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,20 @@ public void restartMainThread(Thread mainJavaThread) {
151151
public static final ThreadLocal<RubyFiber> FIBER_BEING_SPAWNED = new ThreadLocal<>();
152152

153153
private Thread createFiberJavaThread(Runnable runnable) {
154-
RubyFiber fiber = FIBER_BEING_SPAWNED.get();
154+
final RubyFiber fiber = FIBER_BEING_SPAWNED.get();
155155
assert fiber != null;
156-
return createJavaThread(runnable, fiber);
156+
157+
if (context.isPreInitializing()) {
158+
throw new UnsupportedOperationException("fibers should not be created while pre-initializing the context");
159+
}
160+
161+
final Thread thread = new Thread(runnable); // context.getEnv().createUnenteredThread(runnable);
162+
thread.setUncaughtExceptionHandler((javaThread, throwable) -> {
163+
System.err.println("Throwable escaped Fiber pool thread:");
164+
throwable.printStackTrace();
165+
});
166+
rubyManagedThreads.add(thread);
167+
return thread;
157168
}
158169

159170
private Thread createJavaThread(Runnable runnable, RubyFiber fiber) {
@@ -168,26 +179,39 @@ private Thread createJavaThread(Runnable runnable, RubyFiber fiber) {
168179
}
169180

170181
final Thread thread = context.getEnv().createThread(runnable);
182+
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler(fiber));
183+
rubyManagedThreads.add(thread);
184+
return thread;
185+
}
171186

187+
private static Thread.UncaughtExceptionHandler uncaughtExceptionHandler(RubyFiber fiber) {
172188
assert fiber != null;
173-
thread.setUncaughtExceptionHandler((javaThread, throwable) -> {
189+
return (javaThread, throwable) -> {
190+
printInternalError(throwable);
174191
try {
175-
fiber.uncaughtException = throwable;
176-
fiber.initializedLatch.countDown();
177-
} catch (Throwable t) {
192+
// the Fiber is not yet initialized, unblock the caller and rethrow the exception to it
193+
if (fiber.initializedLatch.getCount() > 0) {
194+
fiber.uncaughtException = throwable;
195+
fiber.initializedLatch.countDown();
196+
}
197+
} catch (Throwable t) { // exception inside this UncaughtExceptionHandler
178198
t.initCause(throwable);
179-
t.printStackTrace();
180-
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(javaThread, t);
199+
printInternalError(t);
181200
}
182-
});
183-
184-
rubyManagedThreads.add(thread);
185-
return thread;
201+
};
186202
}
187203

188204
@SuppressFBWarnings("RV")
189-
public void spawnFiber(Runnable task) {
190-
fiberPool.submit(task);
205+
public void spawnFiber(RubyFiber fiber, Runnable task) {
206+
fiberPool.submit(() -> {
207+
try {
208+
task.run();
209+
} catch (Throwable t) {
210+
// Fibers are run on a thread-pool, so make sure any exception escaping
211+
// is handled here as the thread pool ignores exceptions.
212+
uncaughtExceptionHandler(fiber).uncaughtException(Thread.currentThread(), t);
213+
}
214+
});
191215
}
192216

193217
@TruffleBoundary
@@ -280,11 +304,7 @@ private void threadMain(RubyThread thread, Node currentNode, Supplier<Object> ta
280304
rethrowOnMainThread(currentNode, e);
281305
setThreadValue(thread, Nil.INSTANCE);
282306
} catch (Throwable e) {
283-
final String message = StringUtils
284-
.format("%s terminated with internal error:", Thread.currentThread().getName());
285-
final RuntimeException runtimeException = new RuntimeException(message, e);
286-
// Immediately print internal exceptions, in case they would cause a deadlock
287-
runtimeException.printStackTrace();
307+
final RuntimeException runtimeException = printInternalError(e);
288308
rethrowOnMainThread(currentNode, runtimeException);
289309
setThreadValue(thread, Nil.INSTANCE);
290310
} finally {
@@ -293,6 +313,15 @@ private void threadMain(RubyThread thread, Node currentNode, Supplier<Object> ta
293313
}
294314
}
295315

316+
public static RuntimeException printInternalError(Throwable e) {
317+
final String message = StringUtils
318+
.format("%s terminated with internal error:", Thread.currentThread().getName());
319+
final RuntimeException runtimeException = new RuntimeException(message, e);
320+
// Immediately print internal exceptions, in case they would cause a deadlock
321+
runtimeException.printStackTrace();
322+
return runtimeException;
323+
}
324+
296325
private void rethrowOnMainThread(Node currentNode, RuntimeException e) {
297326
context.getSafepointManager().pauseRubyThreadAndExecute(
298327
"rethrow " + e.getClass() + " to main thread",
@@ -361,7 +390,7 @@ private void start(RubyThread thread, Thread javaThread) {
361390
registerThread(thread);
362391

363392
final FiberManager fiberManager = thread.fiberManager;
364-
fiberManager.start(fiberManager.getRootFiber(), javaThread);
393+
fiberManager.start(fiberManager.getRootFiber(), javaThread, true);
365394
}
366395

367396
public void cleanup(RubyThread thread, Thread javaThread) {
@@ -441,6 +470,7 @@ public static <T> T retryWhileInterrupted(BlockingAction<T> action) {
441470

442471
@TruffleBoundary
443472
public <T> T runUntilResultKeepStatus(Node currentNode, BlockingAction<T> action) {
473+
assert context.getEnv().getContext().isEntered() : "Use retryWhileInterrupted() when not entered";
444474
T result = null;
445475

446476
do {

src/main/java/org/truffleruby/language/SafepointManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public void enterThread() {
8787
@TruffleBoundary
8888
public void leaveThread() {
8989
final Thread thread = Thread.currentThread();
90+
assert runningThreads.contains(thread);
9091

9192
phaser.arriveAndDeregister();
9293
if (!runningThreads.remove(thread)) {

src/test/java/org/truffleruby/ContextPermissionsTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,13 @@
1010
package org.truffleruby;
1111

1212
import org.graalvm.polyglot.Context;
13+
import org.graalvm.polyglot.Value;
1314
import org.junit.Assert;
1415
import org.junit.Test;
1516

17+
import static org.junit.Assert.assertEquals;
18+
import static org.junit.Assert.assertTrue;
19+
1620
public class ContextPermissionsTest {
1721

1822
@Test
@@ -77,4 +81,32 @@ public void testNoThreadsEnforcesSingleThreadedOption() throws Throwable {
7781
}
7882
}
7983

84+
@Test
85+
public void testFiberDoesNotTriggerMultiThreading() {
86+
try (Context context = Context.newBuilder("ruby").allowCreateThread(false).build()) {
87+
final Value array = context.eval(
88+
"ruby",
89+
"a = [1]; f = Fiber.new { a << 3; Fiber.yield; a << 5 }; a << 2; f.resume; a << 4; f.resume");
90+
assertTrue(array.hasArrayElements());
91+
assertEquals(5, array.getArraySize());
92+
for (int i = 0; i < 5; i++) {
93+
assertEquals(i + 1, array.getArrayElement(i).asInt());
94+
}
95+
}
96+
}
97+
98+
@Test
99+
public void testNestedFiberAndTerminateFiber() {
100+
try (Context context = Context.newBuilder("ruby").allowCreateThread(false).build()) {
101+
final Value array = context.eval(
102+
"ruby",
103+
"a = []; Fiber.new { a << 1; Fiber.new { a << 2; Fiber.yield; unreachable }.resume; a << 3 }.resume");
104+
assertTrue(array.hasArrayElements());
105+
assertEquals(3, array.getArraySize());
106+
for (int i = 0; i < 3; i++) {
107+
assertEquals(i + 1, array.getArrayElement(i).asInt());
108+
}
109+
}
110+
}
111+
80112
}

0 commit comments

Comments
 (0)