Skip to content

Commit 55964e1

Browse files
committed
Implement forwarding a safepoint to all Fibers of a Thread for SafepointPredicate.ALL_THREADS_AND_FIBERS
* Will also be useful for Fiber#backtrace added in Ruby 3.
1 parent b9a2905 commit 55964e1

File tree

3 files changed

+63
-8
lines changed

3 files changed

+63
-8
lines changed

src/main/java/org/truffleruby/core/fiber/FiberManager.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.truffleruby.core.proc.RubyProc;
2525
import org.truffleruby.core.thread.ThreadManager;
2626
import org.truffleruby.core.thread.ThreadManager.BlockingAction;
27+
import org.truffleruby.language.SafepointAction;
2728
import org.truffleruby.language.control.BreakException;
2829
import org.truffleruby.language.control.DynamicReturnException;
2930
import org.truffleruby.language.control.ExitException;
@@ -42,6 +43,7 @@
4243
public class FiberManager {
4344

4445
public static final String NAME_PREFIX = "Ruby Fiber";
46+
public static final Object[] SAFEPOINT_ARGS = new Object[]{ FiberSafepointMessage.class };
4547

4648
private final RubyLanguage language;
4749
private final RubyContext context;
@@ -106,6 +108,7 @@ private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Nod
106108

107109
final Object prev = truffleContext.enter(currentNode);
108110
language.setupCurrentThread(thread, fiber.rubyThread);
111+
fiber.rubyThread.setCurrentFiber(fiber);
109112

110113
FiberMessage lastMessage = null;
111114
try {
@@ -201,7 +204,13 @@ private void assertNotEntered(String reason) {
201204

202205
@TruffleBoundary
203206
private Object[] handleMessage(RubyFiber fiber, FiberMessage message, Node currentNode) {
204-
fiber.rubyThread.setCurrentFiber(fiber);
207+
// Written as a loop to not grow the stack when processing guest safepoints
208+
while (message instanceof FiberSafepointMessage) {
209+
final FiberSafepointMessage safepointMessage = (FiberSafepointMessage) message;
210+
safepointMessage.action.run(fiber.rubyThread, currentNode);
211+
final RubyFiber sendingFiber = safepointMessage.sendingFiber;
212+
message = resumeAndWait(fiber, sendingFiber, FiberOperation.TRANSFER, SAFEPOINT_ARGS, currentNode);
213+
}
205214

206215
if (message instanceof FiberShutdownMessage) {
207216
throw new FiberShutdownException(currentNode);
@@ -210,13 +219,16 @@ private Object[] handleMessage(RubyFiber fiber, FiberMessage message, Node curre
210219
} else if (message instanceof FiberResumeMessage) {
211220
final FiberResumeMessage resumeMessage = (FiberResumeMessage) message;
212221
assert language.getCurrentThread() == resumeMessage.getSendingFiber().rubyThread;
213-
if (resumeMessage.getOperation() == FiberOperation.RESUME ||
214-
resumeMessage.getOperation() == FiberOperation.RAISE) {
222+
final FiberOperation operation = resumeMessage.getOperation();
223+
224+
if (operation == FiberOperation.RESUME || operation == FiberOperation.RAISE) {
215225
fiber.lastResumedByFiber = resumeMessage.getSendingFiber();
216226
}
217-
if (resumeMessage.getOperation() == FiberOperation.RAISE) {
227+
228+
if (operation == FiberOperation.RAISE) {
218229
throw new RaiseException(context, (RubyException) resumeMessage.getArgs()[0]);
219230
}
231+
220232
return resumeMessage.getArgs();
221233
} else {
222234
throw CompilerDirectives.shouldNotReachHere();
@@ -232,16 +244,38 @@ private void resume(RubyFiber fromFiber, RubyFiber fiber, FiberOperation operati
232244
@TruffleBoundary
233245
public Object[] transferControlTo(RubyFiber fromFiber, RubyFiber fiber, FiberOperation operation, Object[] args,
234246
Node currentNode) {
235-
final TruffleContext truffleContext = context.getEnv().getContext();
247+
final FiberMessage message = resumeAndWait(fromFiber, fiber, operation, args, currentNode);
248+
return handleMessage(fromFiber, message, currentNode);
249+
}
236250

251+
@TruffleBoundary
252+
private FiberMessage resumeAndWait(RubyFiber fromFiber, RubyFiber fiber, FiberOperation operation, Object[] args,
253+
Node currentNode) {
254+
final TruffleContext truffleContext = context.getEnv().getContext();
237255
final FiberMessage message = context
238256
.getThreadManager()
239257
.leaveAndEnter(truffleContext, currentNode, () -> {
240258
resume(fromFiber, fiber, operation, args);
241259
return waitMessage(fromFiber, currentNode);
242260
});
261+
fromFiber.rubyThread.setCurrentFiber(fromFiber);
262+
return message;
263+
}
243264

244-
return handleMessage(fromFiber, message, currentNode);
265+
@TruffleBoundary
266+
public void safepoint(RubyFiber fromFiber, RubyFiber fiber, SafepointAction action, Node currentNode) {
267+
final TruffleContext truffleContext = context.getEnv().getContext();
268+
final FiberResumeMessage returnMessage = (FiberResumeMessage) context
269+
.getThreadManager()
270+
.leaveAndEnter(truffleContext, currentNode, () -> {
271+
addToMessageQueue(fiber, new FiberSafepointMessage(fromFiber, action));
272+
return waitMessage(fromFiber, currentNode);
273+
});
274+
fromFiber.rubyThread.setCurrentFiber(fromFiber);
275+
276+
if (returnMessage.getArgs() != SAFEPOINT_ARGS) {
277+
throw CompilerDirectives.shouldNotReachHere();
278+
}
245279
}
246280

247281
public void start(RubyFiber fiber, Thread javaThread) {
@@ -385,6 +419,16 @@ public Object[] getArgs() {
385419

386420
}
387421

422+
private static class FiberSafepointMessage implements FiberMessage {
423+
private final RubyFiber sendingFiber;
424+
private final SafepointAction action;
425+
426+
private FiberSafepointMessage(RubyFiber sendingFiber, SafepointAction action) {
427+
this.sendingFiber = sendingFiber;
428+
this.action = action;
429+
}
430+
}
431+
388432
/** Used to cleanup and terminate Fibers when the parent Thread dies. */
389433
private static class FiberShutdownException extends TerminationException {
390434
private static final long serialVersionUID = 1522270454305076317L;

src/main/java/org/truffleruby/language/SafepointAction.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.oracle.truffle.api.nodes.Node;
1515
import org.truffleruby.RubyContext;
1616
import org.truffleruby.RubyLanguage;
17+
import org.truffleruby.core.fiber.RubyFiber;
1718
import org.truffleruby.core.thread.RubyThread;
1819

1920
import java.util.Objects;
@@ -62,7 +63,17 @@ protected final void perform(Access access) {
6263
final RubyThread rubyThread = context.getThreadManager().getRubyThreadForJavaThread(access.getThread());
6364
if (filter.test(context, rubyThread, this)) {
6465
run(rubyThread, access.getLocation());
66+
67+
if (filter == SafepointPredicate.ALL_THREADS_AND_FIBERS) {
68+
final RubyFiber currentFiber = rubyThread.getCurrentFiber();
69+
for (RubyFiber fiber : rubyThread.runningFibers) {
70+
if (fiber != currentFiber) {
71+
context.fiberManager.safepoint(currentFiber, fiber, this, access.getLocation());
72+
}
73+
}
74+
}
6575
}
76+
6677
}
6778

6879
public boolean isSynchronous() {

src/main/ruby/truffleruby/core/main.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def to_s
5656
end
5757

5858
show_backtraces = -> {
59-
$stderr.puts 'All Thread backtraces:'
59+
$stderr.puts 'All Thread and Fiber backtraces:'
6060
Primitive.all_fibers_backtraces.each do |fiber, backtrace|
6161
$stderr.puts "#{fiber} of #{Primitive.fiber_thread(fiber)}", backtrace, nil
6262
end
@@ -68,7 +68,7 @@ def to_s
6868
if Truffle::Boot.get_option('platform-handle-interrupt')
6969
Primitive.vm_watch_signal 'INT', true, -> _signo do
7070
if Truffle::Boot.get_option('backtraces-on-interrupt')
71-
puts 'Interrupting...'
71+
$stderr.puts 'Interrupting...'
7272
show_backtraces.call
7373
end
7474

0 commit comments

Comments
 (0)