Skip to content

Commit cd9bcad

Browse files
committed
[GR-44211] Use Env#newTruffleThreadBuilder for Ruby Fibers
PullRequest: truffleruby/3681
2 parents 4dea5a3 + 3637300 commit cd9bcad

File tree

11 files changed

+235
-120
lines changed

11 files changed

+235
-120
lines changed

spec/ruby/core/fiber/inspect_spec.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
require_relative '../../spec_helper'
2+
require 'fiber'
3+
4+
describe "Fiber#inspect" do
5+
describe "status" do
6+
it "is resumed for the root Fiber of a Thread" do
7+
inspected = Thread.new { Fiber.current.inspect }.value
8+
inspected.should =~ /\A#<Fiber:0x\h+ .*\(resumed\)>\z/
9+
end
10+
11+
it "is created for a Fiber which did not run yet" do
12+
inspected = Fiber.new {}.inspect
13+
inspected.should =~ /\A#<Fiber:0x\h+ .+ \(created\)>\z/
14+
end
15+
16+
it "is resumed for a Fiber which was resumed" do
17+
inspected = Fiber.new { Fiber.current.inspect }.resume
18+
inspected.should =~ /\A#<Fiber:0x\h+ .+ \(resumed\)>\z/
19+
end
20+
21+
ruby_version_is "3.0" do
22+
it "is resumed for a Fiber which was transferred" do
23+
inspected = Fiber.new { Fiber.current.inspect }.transfer
24+
inspected.should =~ /\A#<Fiber:0x\h+ .+ \(resumed\)>\z/
25+
end
26+
end
27+
28+
it "is suspended for a Fiber which was resumed and yielded" do
29+
inspected = Fiber.new { Fiber.yield }.tap(&:resume).inspect
30+
inspected.should =~ /\A#<Fiber:0x\h+ .+ \(suspended\)>\z/
31+
end
32+
33+
it "is terminated for a Fiber which has terminated" do
34+
inspected = Fiber.new {}.tap(&:resume).inspect
35+
inspected.should =~ /\A#<Fiber:0x\h+ .+ \(terminated\)>\z/
36+
end
37+
end
38+
end

spec/tags/core/exception/top_level_tags.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,3 @@ slow:An Exception reaching the top level with a custom backtrace is printed on S
33
slow:An Exception reaching the top level the Exception#cause is printed to STDERR with backtraces
44
slow:An Exception reaching the top level kills all threads and fibers, ensure clauses are only run for threads current fibers, not for suspended fibers with ensure on the root fiber
55
slow:An Exception reaching the top level kills all threads and fibers, ensure clauses are only run for threads current fibers, not for suspended fibers with ensure on non-root fiber
6-
fails(GR-44211):An Exception reaching the top level kills all threads and fibers, ensure clauses are only run for threads current fibers, not for suspended fibers with ensure on non-root fiber

src/main/java/org/truffleruby/RubyLanguage.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,17 @@ public void invalidateTracingAssumption() {
406406
tracingAssumption = tracingCyclicAssumption.getAssumption();
407407
}
408408

409+
private boolean multiThreading = false;
410+
411+
public boolean isMultiThreaded() {
412+
return multiThreading;
413+
}
414+
415+
@Override
416+
protected void initializeMultiThreading(RubyContext context) {
417+
this.multiThreading = true;
418+
}
419+
409420
@Override
410421
protected void initializeMultipleContexts() {
411422
LOGGER.fine("initializeMultipleContexts()");
@@ -619,8 +630,9 @@ public void initializeThread(RubyContext context, Thread thread) {
619630
.shouldNotReachHere("Ruby threads should be initialized on their Java thread");
620631
}
621632
context.getThreadManager().start(rubyThread, thread);
622-
} else {
623-
// Fiber
633+
} else { // (non-root) Fiber
634+
var fiber = this.rubyFiber.get(thread);
635+
rubyThread.setCurrentFiber(fiber);
624636
}
625637
return;
626638
}
@@ -657,7 +669,8 @@ public void disposeThread(RubyContext context, Thread thread) {
657669
}
658670
context.getThreadManager().cleanupThreadState(rubyThread, thread);
659671
} else { // (non-root) Fiber
660-
// Fibers are always cleaned up by their thread's cleanup with FiberManager#killOtherFibers()
672+
var fiber = this.rubyFiber.get(thread);
673+
context.fiberManager.cleanup(fiber, thread);
661674
}
662675
return;
663676
}

src/main/java/org/truffleruby/core/fiber/FiberManager.java

Lines changed: 83 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
*/
1010
package org.truffleruby.core.fiber;
1111

12+
import java.util.Objects;
1213
import java.util.concurrent.CountDownLatch;
1314

15+
import com.oracle.truffle.api.TruffleSafepoint.Interrupter;
1416
import org.truffleruby.core.fiber.RubyFiber.FiberStatus;
1517

1618
import com.oracle.truffle.api.TruffleContext;
@@ -60,27 +62,56 @@ public FiberManager(RubyLanguage language, RubyContext context) {
6062
public void initialize(RubyFiber fiber, boolean blocking, RubyProc block, Node currentNode) {
6163
final SourceSection sourceSection = block.getSharedMethodInfo().getSourceSection();
6264
fiber.sourceLocation = context.fileLine(sourceSection);
65+
fiber.body = block;
66+
fiber.initializeNode = currentNode;
6367
fiber.blocking = blocking;
68+
}
69+
70+
private void createThreadToReceiveFirstMessage(RubyFiber fiber, Node currentNode) {
71+
assert fiber.thread == null;
72+
RubyProc block = Objects.requireNonNull(fiber.body);
73+
fiber.body = null;
74+
Node initializeNode = Objects.requireNonNull(fiber.initializeNode);
75+
fiber.initializeNode = null;
6476

77+
var sourceSection = block.getSharedMethodInfo().getSourceSection();
6578
final TruffleContext truffleContext = context.getEnv().getContext();
6679

67-
context.getThreadManager().leaveAndEnter(truffleContext, currentNode, () -> {
68-
context.getThreadManager().spawnFiber(fiber, sourceSection,
69-
() -> fiberMain(context, fiber, block, currentNode));
70-
waitForInitialization(context, fiber, currentNode);
80+
truffleContext.leaveAndEnter(currentNode, Interrupter.THREAD_INTERRUPT, (unused) -> {
81+
Thread thread = context.getThreadManager().createFiberJavaThread(fiber, sourceSection,
82+
() -> beforeEnter(fiber, initializeNode),
83+
() -> fiberMain(context, fiber, block, initializeNode),
84+
() -> afterLeave(fiber), currentNode);
85+
fiber.thread = thread;
86+
thread.start();
87+
waitForInitializationUnentered(context, fiber, currentNode);
7188
return BlockingAction.SUCCESS;
72-
});
89+
}, null);
7390
}
7491

7592
/** Wait for full initialization of the new fiber */
76-
public static void waitForInitialization(RubyContext context, RubyFiber fiber, Node currentNode) {
93+
public static void waitForInitializationEntered(RubyContext context, RubyFiber fiber, Node currentNode) {
94+
assert context.getEnv().getContext().isEntered();
7795
final CountDownLatch initializedLatch = fiber.initializedLatch;
7896

79-
if (context.getEnv().getContext().isEntered()) {
80-
context.getThreadManager().runUntilResultKeepStatus(currentNode, CountDownLatch::await, initializedLatch);
81-
} else {
82-
context.getThreadManager().retryWhileInterrupted(currentNode, CountDownLatch::await, initializedLatch);
97+
context.getThreadManager().runUntilResultKeepStatus(currentNode, latch -> {
98+
latch.await();
99+
return BlockingAction.SUCCESS;
100+
}, initializedLatch);
101+
102+
final Throwable uncaughtException = fiber.uncaughtException;
103+
if (uncaughtException != null) {
104+
ExceptionOperations.rethrow(uncaughtException);
83105
}
106+
}
107+
108+
/** Wait for full initialization of the new fiber */
109+
public static void waitForInitializationUnentered(RubyContext context, RubyFiber fiber, Node currentNode)
110+
throws InterruptedException {
111+
assert !context.getEnv().getContext().isEntered();
112+
final CountDownLatch initializedLatch = fiber.initializedLatch;
113+
114+
initializedLatch.await();
84115

85116
final Throwable uncaughtException = fiber.uncaughtException;
86117
if (uncaughtException != null) {
@@ -90,23 +121,26 @@ public static void waitForInitialization(RubyContext context, RubyFiber fiber, N
90121

91122
private static final BranchProfile UNPROFILED = BranchProfile.getUncached();
92123

93-
private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Node currentNode) {
124+
private void beforeEnter(RubyFiber fiber, Node currentNode) {
94125
assert !fiber.isRootFiber() : "Root Fibers execute threadMain() and not fiberMain()";
95126
assertNotEntered("Fibers should start unentered to avoid triggering multithreading");
96127

97128
final Thread thread = Thread.currentThread();
98-
final TruffleContext truffleContext = context.getEnv().getContext();
99-
100129
start(fiber, thread);
101130

102131
// fully initialized
103132
fiber.initializedLatch.countDown();
104133

105-
final FiberMessage message = waitMessage(fiber, currentNode);
106-
fiber.rubyThread.setCurrentFiber(fiber);
134+
try {
135+
fiber.firstMessage = waitMessage(fiber, currentNode);
136+
} catch (InterruptedException e) {
137+
throw CompilerDirectives.shouldNotReachHere("unexpected interrupt in Fiber beforeEnter()");
138+
}
139+
}
107140

108-
// enter() polls so we need the current Fiber to be set before enter()
109-
final Object prev = truffleContext.enter(currentNode);
141+
private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Node currentNode) {
142+
final FiberMessage message = Objects.requireNonNull(fiber.firstMessage);
143+
fiber.firstMessage = null;
110144

111145
FiberMessage lastMessage = null;
112146
try {
@@ -134,18 +168,21 @@ private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Nod
134168
final RuntimeException exception = ThreadManager.printInternalError(e);
135169
lastMessage = new FiberExceptionMessage(exception);
136170
} finally {
137-
final RubyFiber returnFiber = lastMessage == null ? null : getReturnFiber(fiber, currentNode, UNPROFILED);
171+
fiber.lastMessage = lastMessage;
172+
fiber.returnFiber = lastMessage == null ? null : getReturnFiber(fiber, currentNode, UNPROFILED);
138173

139174
// Perform all cleanup before resuming the parent Fiber
140175
// Make sure that other fibers notice we are dead before they gain control back
141176
fiber.status = FiberStatus.TERMINATED;
142177
// Leave context before addToMessageQueue() -> parent Fiber starts executing
143-
truffleContext.leave(currentNode, prev);
144-
cleanup(fiber, thread);
178+
}
179+
}
145180

146-
if (lastMessage != null) {
147-
addToMessageQueue(returnFiber, lastMessage);
148-
}
181+
private void afterLeave(RubyFiber fiber) {
182+
if (fiber.lastMessage != null) {
183+
addToMessageQueue(fiber.returnFiber, fiber.lastMessage);
184+
fiber.returnFiber = null;
185+
fiber.lastMessage = null;
149186
}
150187
}
151188

@@ -161,7 +198,7 @@ public RubyFiber getReturnFiber(RubyFiber currentFiber, Node currentNode, Branch
161198
return previousFiber;
162199
} else {
163200

164-
if (currentFiber == rootFiber) {
201+
if (currentFiber == rootFiber) { // Note: this is always false for the fiberMain() caller
165202
errorProfile.enter();
166203
throw new RaiseException(context, context.getCoreExceptions().yieldFromRootFiberError(currentNode));
167204
}
@@ -184,24 +221,9 @@ private void addToMessageQueue(RubyFiber fiber, FiberMessage message) {
184221

185222
/** Send the Java thread that represents this fiber to sleep until it receives a message. */
186223
@TruffleBoundary
187-
private FiberMessage waitMessage(RubyFiber fiber, Node currentNode) {
224+
private FiberMessage waitMessage(RubyFiber fiber, Node currentNode) throws InterruptedException {
188225
assertNotEntered("should have left context while waiting fiber message");
189-
190-
class State {
191-
final RubyFiber fiber;
192-
FiberMessage message;
193-
194-
State(RubyFiber fiber) {
195-
this.fiber = fiber;
196-
}
197-
}
198-
199-
final State state = new State(fiber);
200-
context.getThreadManager().retryWhileInterrupted(
201-
currentNode,
202-
s -> s.message = s.fiber.messageQueue.take(),
203-
state);
204-
return state.message;
226+
return fiber.messageQueue.take();
205227
}
206228

207229
private void assertNotEntered(String reason) {
@@ -287,26 +309,29 @@ public DescriptorAndArgs transferControlTo(RubyFiber fromFiber, RubyFiber toFibe
287309
@TruffleBoundary
288310
private FiberMessage resumeAndWait(RubyFiber fromFiber, RubyFiber toFiber, FiberOperation operation,
289311
ArgumentsDescriptor descriptor, Object[] args, Node currentNode) {
312+
313+
if (toFiber.body != null) {
314+
context.fiberManager.createThreadToReceiveFirstMessage(toFiber, currentNode);
315+
}
316+
290317
final TruffleContext truffleContext = context.getEnv().getContext();
291-
final FiberMessage message = context
292-
.getThreadManager()
293-
.leaveAndEnter(truffleContext, currentNode, () -> {
318+
final FiberMessage message = truffleContext.leaveAndEnter(currentNode, Interrupter.THREAD_INTERRUPT,
319+
(unused) -> {
294320
resume(fromFiber, toFiber, operation, descriptor, args);
295321
return waitMessage(fromFiber, currentNode);
296-
});
322+
}, null);
297323
fromFiber.rubyThread.setCurrentFiber(fromFiber);
298324
return message;
299325
}
300326

301327
@TruffleBoundary
302328
public void safepoint(RubyFiber fromFiber, RubyFiber fiber, SafepointAction action, Node currentNode) {
303329
final TruffleContext truffleContext = context.getEnv().getContext();
304-
final FiberResumeMessage returnMessage = (FiberResumeMessage) context
305-
.getThreadManager()
306-
.leaveAndEnter(truffleContext, currentNode, () -> {
330+
final FiberResumeMessage returnMessage = (FiberResumeMessage) truffleContext.leaveAndEnter(currentNode,
331+
Interrupter.THREAD_INTERRUPT, (unused) -> {
307332
addToMessageQueue(fiber, new FiberSafepointMessage(fromFiber, action));
308333
return waitMessage(fromFiber, currentNode);
309-
});
334+
}, null);
310335
fromFiber.rubyThread.setCurrentFiber(fromFiber);
311336

312337
if (returnMessage.getArgs() != SAFEPOINT_ARGS) {
@@ -315,7 +340,11 @@ public void safepoint(RubyFiber fromFiber, RubyFiber fiber, SafepointAction acti
315340
}
316341

317342
public void start(RubyFiber fiber, Thread javaThread) {
318-
fiber.thread = javaThread;
343+
if (fiber.isRootFiber()) {
344+
fiber.thread = javaThread;
345+
} else {
346+
// fiber.thread set by createThreadToReceiveFirstMessage()
347+
}
319348

320349
final RubyThread rubyThread = fiber.rubyThread;
321350

@@ -354,26 +383,23 @@ public void killOtherFibers(RubyThread thread) {
354383
boolean allowSideEffects = safepoint.setAllowSideEffects(false);
355384
try {
356385
final TruffleContext truffleContext = context.getEnv().getContext();
357-
context.getThreadManager().leaveAndEnter(truffleContext, DummyNode.INSTANCE, () -> {
386+
truffleContext.leaveAndEnter(DummyNode.INSTANCE, Interrupter.THREAD_INTERRUPT, (unused) -> {
358387
doKillOtherFibers(thread);
359388
return BlockingAction.SUCCESS;
360-
});
389+
}, null);
361390
} finally {
362391
safepoint.setAllowSideEffects(allowSideEffects);
363392
}
364393
}
365394

366-
private void doKillOtherFibers(RubyThread thread) {
395+
private void doKillOtherFibers(RubyThread thread) throws InterruptedException {
367396
for (RubyFiber fiber : thread.runningFibers) {
368397
if (!fiber.isRootFiber()) {
369398
addToMessageQueue(fiber, new FiberShutdownMessage());
370399

371400
// Wait for the Fiber to finish so we only run one Fiber at a time
372401
final CountDownLatch finishedLatch = fiber.finishedLatch;
373-
context.getThreadManager().retryWhileInterrupted(
374-
DummyNode.INSTANCE,
375-
CountDownLatch::await,
376-
finishedLatch);
402+
finishedLatch.await();
377403

378404
final Throwable uncaughtException = fiber.uncaughtException;
379405
if (uncaughtException != null) {

src/main/java/org/truffleruby/core/fiber/FiberNodes.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ protected RubyFiber allocate(RubyClass rubyClass) {
109109
getContext(),
110110
getLanguage(),
111111
thread,
112+
FiberStatus.CREATED,
112113
"<uninitialized>");
113114
AllocationTracing.trace(fiber, this);
114115
return fiber;
@@ -121,6 +122,13 @@ public abstract static class InitializeNode extends PrimitiveArrayArgumentsNode
121122
@TruffleBoundary
122123
@Specialization
123124
protected Object initialize(RubyFiber fiber, boolean blocking, RubyProc block) {
125+
if (!getContext().getEnv().isCreateThreadAllowed()) {
126+
// Because TruffleThreadBuilder#build denies it already, before the thread is even started.
127+
// The permission is called allowCreateThread, so it kind of makes sense.
128+
throw new RaiseException(getContext(),
129+
coreExceptions().securityError("fibers not allowed with allowCreateThread(false)", this));
130+
}
131+
124132
getContext().fiberManager.initialize(fiber, blocking, block, this);
125133
return nil;
126134
}

0 commit comments

Comments
 (0)