Skip to content

Commit c167083

Browse files
committed
[GR-6190] Do not trigger initializeMultiThreading() when creating Fibers
PullRequest: truffleruby/2430
2 parents ffeea56 + 589b3d6 commit c167083

File tree

6 files changed

+199
-54
lines changed

6 files changed

+199
-54
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ New features:
55
* Access to local variables of the interactive Binding via language bindings is now supported: `context.getBindings("ruby").putMember("my_var", 42);` (#2030).
66
* `VALUE`s in C extensions now expose the Ruby object when viewed in the debugger, as long as they have not been converted to native values.
77
* Signal handlers can now be run without triggering multi-threading.
8+
* Fibers no longer trigger Truffle multi-threading.
89

910
Bug fixes:
1011

src/main/java/org/truffleruby/core/fiber/FiberManager.java

Lines changed: 85 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.concurrent.ConcurrentHashMap;
1515
import java.util.concurrent.CountDownLatch;
1616

17+
import com.oracle.truffle.api.TruffleContext;
1718
import org.truffleruby.RubyContext;
1819
import org.truffleruby.RubyLanguage;
1920
import org.truffleruby.core.array.ArrayHelpers;
@@ -101,10 +102,15 @@ public RubyFiber createFiber(RubyLanguage language, RubyContext context, RubyThr
101102
}
102103

103104
public void initialize(RubyFiber fiber, RubyProc block, Node currentNode) {
105+
final TruffleContext truffleContext = context.getEnv().getContext();
106+
104107
ThreadManager.FIBER_BEING_SPAWNED.set(fiber);
105108
try {
106-
context.getThreadManager().spawnFiber(() -> fiberMain(context, fiber, block, currentNode));
107-
waitForInitialization(context, fiber, currentNode);
109+
context.getThreadManager().leaveAndEnter(truffleContext, currentNode, () -> {
110+
context.getThreadManager().spawnFiber(fiber, () -> fiberMain(context, fiber, block, currentNode));
111+
waitForInitialization(context, fiber, currentNode);
112+
return BlockingAction.SUCCESS;
113+
}, context.getThreadManager().isRubyManagedThread(Thread.currentThread()));
108114
} finally {
109115
ThreadManager.FIBER_BEING_SPAWNED.remove();
110116
}
@@ -114,18 +120,24 @@ public void initialize(RubyFiber fiber, RubyProc block, Node currentNode) {
114120
public static void waitForInitialization(RubyContext context, RubyFiber fiber, Node currentNode) {
115121
final CountDownLatch initializedLatch = fiber.initializedLatch;
116122

117-
context.getThreadManager().runUntilResultKeepStatus(currentNode, () -> {
123+
final BlockingAction<Boolean> blockingAction = () -> {
118124
initializedLatch.await();
119125
return BlockingAction.SUCCESS;
120-
});
126+
};
127+
128+
if (context.getEnv().getContext().isEntered()) {
129+
context.getThreadManager().runUntilResultKeepStatus(currentNode, blockingAction);
130+
} else {
131+
context.getThreadManager().retryWhileInterrupted(blockingAction);
132+
}
121133

122134
final Throwable uncaughtException = fiber.uncaughtException;
123135
if (uncaughtException != null) {
124136
ExceptionOperations.rethrow(uncaughtException);
125137
}
126138
}
127139

128-
private static final BranchProfile UNPROFILED = BranchProfile.create();
140+
private static final BranchProfile UNPROFILED = BranchProfile.getUncached();
129141

130142
private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Node currentNode) {
131143
assert fiber != rootFiber : "Root Fibers execute threadMain() and not fiberMain()";
@@ -135,16 +147,30 @@ private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Nod
135147
final String oldName = thread.getName();
136148
thread.setName(NAME_PREFIX + " id=" + thread.getId() + " from " + RubyContext.fileLine(sourceSection));
137149

138-
start(fiber, thread);
139-
try {
150+
start(fiber, thread, false);
151+
152+
final TruffleContext truffleContext = context.getEnv().getContext();
153+
assert !truffleContext.isEntered();
140154

141-
final Object[] args = waitForResume(fiber);
155+
final Object prev = truffleContext.enter(currentNode); // enter and leave now to workaround GR-29773
156+
context.getSafepointManager().enterThread(); // not done in start() above because the context was not entered
157+
final FiberMessage message = context.getThreadManager().leaveAndEnter(truffleContext, currentNode, () -> {
158+
// fully initialized
159+
fiber.initializedLatch.countDown();
160+
return waitMessage(fiber);
161+
}, true);
162+
163+
try {
142164
final Object result;
143165
try {
166+
final Object[] args = handleMessage(fiber, message);
144167
result = ProcOperations.rootCall(block, args);
145168
} finally {
146169
// Make sure that other fibers notice we are dead before they gain control back
147170
fiber.alive = false;
171+
// Leave before resume/sendExceptionToParentFiber -> addToMessageQueue() -> parent Fiber starts executing
172+
context.getSafepointManager().leaveThread();
173+
truffleContext.leave(currentNode, prev);
148174
}
149175
resume(fiber, getReturnFiber(fiber, currentNode, UNPROFILED), FiberOperation.YIELD, result);
150176

@@ -164,8 +190,11 @@ private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Nod
164190
fiber,
165191
new RaiseException(context, context.getCoreExceptions().unexpectedReturn(currentNode)),
166192
currentNode);
193+
} catch (Throwable e) {
194+
final RuntimeException exception = ThreadManager.printInternalError(e);
195+
sendExceptionToParentFiber(fiber, exception, currentNode);
167196
} finally {
168-
cleanup(fiber, thread);
197+
cleanup(fiber, thread, false);
169198
thread.setName(oldName);
170199
}
171200
}
@@ -193,16 +222,19 @@ public RubyFiber getReturnFiber(RubyFiber currentFiber, Node currentNode, Branch
193222

194223
@TruffleBoundary
195224
private void addToMessageQueue(RubyFiber fiber, FiberMessage message) {
225+
assert !context.getEnv().getContext().isEntered() : "should have left context when sending message to fiber";
196226
fiber.messageQueue.add(message);
197227
}
198228

199-
/** Send the Java thread that represents this fiber to sleep until it receives a resume or exit message. */
229+
/** Send the Java thread that represents this fiber to sleep until it receives a message. */
200230
@TruffleBoundary
201-
private Object[] waitForResume(RubyFiber fiber) {
202-
final FiberMessage message = context.getThreadManager().runUntilResultKeepStatus(
203-
null,
204-
() -> fiber.messageQueue.take());
231+
private FiberMessage waitMessage(RubyFiber fiber) {
232+
assert !context.getEnv().getContext().isEntered() : "should have left context while waiting fiber message";
233+
return context.getThreadManager().retryWhileInterrupted(fiber.messageQueue::take);
234+
}
205235

236+
@TruffleBoundary
237+
private Object[] handleMessage(RubyFiber fiber, FiberMessage message) {
206238
setCurrentFiber(fiber);
207239

208240
if (message instanceof FiberShutdownMessage) {
@@ -227,12 +259,21 @@ private void resume(RubyFiber fromFiber, RubyFiber fiber, FiberOperation operati
227259
addToMessageQueue(fiber, new FiberResumeMessage(operation, fromFiber, args));
228260
}
229261

262+
@TruffleBoundary
230263
public Object[] transferControlTo(RubyFiber fromFiber, RubyFiber fiber, FiberOperation operation, Object[] args) {
231-
resume(fromFiber, fiber, operation, args);
232-
return waitForResume(fromFiber);
264+
final TruffleContext truffleContext = context.getEnv().getContext();
265+
final boolean isRubyManagedThread = context.getThreadManager().isRubyManagedThread(Thread.currentThread());
266+
267+
final FiberMessage message = context.getThreadManager().leaveAndEnter(truffleContext, null, () -> {
268+
resume(fromFiber, fiber, operation, args);
269+
return waitMessage(fromFiber);
270+
}, isRubyManagedThread);
271+
272+
return handleMessage(fromFiber, message);
233273
}
234274

235-
public void start(RubyFiber fiber, Thread javaThread) {
275+
public void start(RubyFiber fiber, Thread javaThread, boolean entered) {
276+
assert entered == context.getEnv().getContext().isEntered();
236277
final ThreadManager threadManager = context.getThreadManager();
237278

238279
if (Thread.currentThread() == javaThread) {
@@ -249,31 +290,31 @@ public void start(RubyFiber fiber, Thread javaThread) {
249290

250291
runningFibers.add(fiber);
251292

252-
if (threadManager.isRubyManagedThread(javaThread)) {
293+
if (threadManager.isRubyManagedThread(javaThread) && Thread.currentThread() == javaThread && entered) {
253294
context.getSafepointManager().enterThread();
254295
}
255-
256-
// fully initialized
257-
fiber.initializedLatch.countDown();
258296
}
259297

260-
public void cleanup(RubyFiber fiber, Thread javaThread) {
298+
public void cleanup(RubyFiber fiber, Thread javaThread, boolean entered) {
299+
assert entered == context.getEnv().getContext().isEntered();
300+
final ThreadManager threadManager = context.getThreadManager();
301+
261302
fiber.alive = false;
262303

263-
if (context.getThreadManager().isRubyManagedThread(javaThread)) {
304+
if (threadManager.isRubyManagedThread(javaThread) && Thread.currentThread() == javaThread && entered) {
264305
context.getSafepointManager().leaveThread();
265306
}
266307

267-
context.getThreadManager().cleanupValuesForJavaThread(javaThread);
308+
threadManager.cleanupValuesForJavaThread(javaThread);
268309

269310
runningFibers.remove(fiber);
270311

271312
fiber.thread = null;
272313

273314
if (Thread.currentThread() == javaThread) {
274-
context.getThreadManager().rubyFiber.remove();
315+
threadManager.rubyFiber.remove();
275316
}
276-
context.getThreadManager().rubyFiberForeignMap.remove(javaThread);
317+
threadManager.rubyFiberForeignMap.remove(javaThread);
277318

278319
fiber.finishedLatch.countDown();
279320
}
@@ -283,24 +324,40 @@ public void killOtherFibers() {
283324
// All Fibers except the current one are in waitForResume(),
284325
// so sending a FiberShutdownMessage is enough to finish them.
285326
// This also avoids the performance cost of a safepoint.
327+
328+
// This method might not be executed on the rootFiber Java Thread but possibly on another Java Thread.
329+
330+
final TruffleContext truffleContext = context.getEnv().getContext();
331+
context.getThreadManager().leaveAndEnter(truffleContext, null, () -> {
332+
doKillOtherFibers();
333+
return BlockingAction.SUCCESS;
334+
}, true);
335+
}
336+
337+
private void doKillOtherFibers() {
286338
for (RubyFiber fiber : runningFibers) {
287339
if (fiber != rootFiber) {
288340
addToMessageQueue(fiber, new FiberShutdownMessage());
289341

290342
// Wait for the Fiber to finish so we only run one Fiber at a time
291343
final CountDownLatch finishedLatch = fiber.finishedLatch;
292-
context.getThreadManager().runUntilResultKeepStatus(null, () -> {
344+
context.getThreadManager().retryWhileInterrupted(() -> {
293345
finishedLatch.await();
294346
return BlockingAction.SUCCESS;
295347
});
348+
349+
final Throwable uncaughtException = fiber.uncaughtException;
350+
if (uncaughtException != null) {
351+
ExceptionOperations.rethrow(uncaughtException);
352+
}
296353
}
297354
}
298355
}
299356

300357
@TruffleBoundary
301358
public void shutdown(Thread javaThread) {
302359
killOtherFibers();
303-
cleanup(rootFiber, javaThread);
360+
cleanup(rootFiber, javaThread, true);
304361
}
305362

306363
public String getFiberDebugInfo() {

0 commit comments

Comments
 (0)