24
24
import org .truffleruby .core .proc .RubyProc ;
25
25
import org .truffleruby .core .thread .ThreadManager ;
26
26
import org .truffleruby .core .thread .ThreadManager .BlockingAction ;
27
+ import org .truffleruby .language .SafepointAction ;
27
28
import org .truffleruby .language .control .BreakException ;
28
29
import org .truffleruby .language .control .DynamicReturnException ;
29
30
import org .truffleruby .language .control .ExitException ;
42
43
public class FiberManager {
43
44
44
45
public static final String NAME_PREFIX = "Ruby Fiber" ;
46
+ public static final Object [] SAFEPOINT_ARGS = new Object []{ FiberSafepointMessage .class };
45
47
46
48
private final RubyLanguage language ;
47
49
private final RubyContext context ;
@@ -89,9 +91,7 @@ public static void waitForInitialization(RubyContext context, RubyFiber fiber, N
89
91
90
92
private void fiberMain (RubyContext context , RubyFiber fiber , RubyProc block , Node currentNode ) {
91
93
assert !fiber .isRootFiber () : "Root Fibers execute threadMain() and not fiberMain()" ;
92
-
93
- final boolean entered = !context .getOptions ().FIBER_LEAVE_CONTEXT ;
94
- assert entered == context .getEnv ().getContext ().isEntered ();
94
+ assertNotEntered ("Fibers should start unentered to avoid triggering multithreading" );
95
95
96
96
final Thread thread = Thread .currentThread ();
97
97
final SourceSection sourceSection = block .sharedMethodInfo .getSourceSection ();
@@ -104,15 +104,11 @@ private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Nod
104
104
fiber .initializedLatch .countDown ();
105
105
106
106
final FiberMessage message = waitMessage (fiber , currentNode );
107
- final TruffleContext truffleContext = entered ? null : context .getEnv ().getContext ();
107
+ final TruffleContext truffleContext = context .getEnv ().getContext ();
108
108
109
- final Object prev ;
110
- if (!entered ) {
111
- prev = truffleContext .enter (currentNode );
112
- } else {
113
- prev = null ;
114
- }
109
+ final Object prev = truffleContext .enter (currentNode );
115
110
language .setupCurrentThread (thread , fiber .rubyThread );
111
+ fiber .rubyThread .setCurrentFiber (fiber );
116
112
117
113
FiberMessage lastMessage = null ;
118
114
try {
@@ -146,9 +142,7 @@ private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Nod
146
142
fiber .alive = false ;
147
143
// Leave context before addToMessageQueue() -> parent Fiber starts executing
148
144
language .setupCurrentThread (thread , null );
149
- if (!entered ) {
150
- truffleContext .leave (currentNode , prev );
151
- }
145
+ truffleContext .leave (currentNode , prev );
152
146
cleanup (fiber , thread );
153
147
thread .setName (oldName );
154
148
@@ -205,12 +199,18 @@ class State {
205
199
}
206
200
207
201
private void assertNotEntered (String reason ) {
208
- assert !context .getOptions (). FIBER_LEAVE_CONTEXT || ! context . getEnv ().getContext ().isEntered () : reason ;
202
+ assert !context .getEnv ().getContext ().isEntered () : reason ;
209
203
}
210
204
211
205
@ TruffleBoundary
212
206
private Object [] handleMessage (RubyFiber fiber , FiberMessage message , Node currentNode ) {
213
- fiber .rubyThread .setCurrentFiber (fiber );
207
+ // Written as a loop to not grow the stack when processing guest safepoints
208
+ while (message instanceof FiberSafepointMessage ) {
209
+ final FiberSafepointMessage safepointMessage = (FiberSafepointMessage ) message ;
210
+ safepointMessage .action .run (fiber .rubyThread , currentNode );
211
+ final RubyFiber sendingFiber = safepointMessage .sendingFiber ;
212
+ message = resumeAndWait (fiber , sendingFiber , FiberOperation .TRANSFER , SAFEPOINT_ARGS , currentNode );
213
+ }
214
214
215
215
if (message instanceof FiberShutdownMessage ) {
216
216
throw new FiberShutdownException (currentNode );
@@ -219,13 +219,16 @@ private Object[] handleMessage(RubyFiber fiber, FiberMessage message, Node curre
219
219
} else if (message instanceof FiberResumeMessage ) {
220
220
final FiberResumeMessage resumeMessage = (FiberResumeMessage ) message ;
221
221
assert language .getCurrentThread () == resumeMessage .getSendingFiber ().rubyThread ;
222
- if (resumeMessage .getOperation () == FiberOperation .RESUME ||
223
- resumeMessage .getOperation () == FiberOperation .RAISE ) {
222
+ final FiberOperation operation = resumeMessage .getOperation ();
223
+
224
+ if (operation == FiberOperation .RESUME || operation == FiberOperation .RAISE ) {
224
225
fiber .lastResumedByFiber = resumeMessage .getSendingFiber ();
225
226
}
226
- if (resumeMessage .getOperation () == FiberOperation .RAISE ) {
227
+
228
+ if (operation == FiberOperation .RAISE ) {
227
229
throw new RaiseException (context , (RubyException ) resumeMessage .getArgs ()[0 ]);
228
230
}
231
+
229
232
return resumeMessage .getArgs ();
230
233
} else {
231
234
throw CompilerDirectives .shouldNotReachHere ();
@@ -241,16 +244,38 @@ private void resume(RubyFiber fromFiber, RubyFiber fiber, FiberOperation operati
241
244
@ TruffleBoundary
242
245
public Object [] transferControlTo (RubyFiber fromFiber , RubyFiber fiber , FiberOperation operation , Object [] args ,
243
246
Node currentNode ) {
244
- final TruffleContext truffleContext = context .getEnv ().getContext ();
247
+ final FiberMessage message = resumeAndWait (fromFiber , fiber , operation , args , currentNode );
248
+ return handleMessage (fromFiber , message , currentNode );
249
+ }
245
250
251
+ @ TruffleBoundary
252
+ private FiberMessage resumeAndWait (RubyFiber fromFiber , RubyFiber fiber , FiberOperation operation , Object [] args ,
253
+ Node currentNode ) {
254
+ final TruffleContext truffleContext = context .getEnv ().getContext ();
246
255
final FiberMessage message = context
247
256
.getThreadManager ()
248
257
.leaveAndEnter (truffleContext , currentNode , () -> {
249
258
resume (fromFiber , fiber , operation , args );
250
259
return waitMessage (fromFiber , currentNode );
251
260
});
261
+ fromFiber .rubyThread .setCurrentFiber (fromFiber );
262
+ return message ;
263
+ }
252
264
253
- return handleMessage (fromFiber , message , currentNode );
265
+ @ TruffleBoundary
266
+ public void safepoint (RubyFiber fromFiber , RubyFiber fiber , SafepointAction action , Node currentNode ) {
267
+ final TruffleContext truffleContext = context .getEnv ().getContext ();
268
+ final FiberResumeMessage returnMessage = (FiberResumeMessage ) context
269
+ .getThreadManager ()
270
+ .leaveAndEnter (truffleContext , currentNode , () -> {
271
+ addToMessageQueue (fiber , new FiberSafepointMessage (fromFiber , action ));
272
+ return waitMessage (fromFiber , currentNode );
273
+ });
274
+ fromFiber .rubyThread .setCurrentFiber (fromFiber );
275
+
276
+ if (returnMessage .getArgs () != SAFEPOINT_ARGS ) {
277
+ throw CompilerDirectives .shouldNotReachHere ();
278
+ }
254
279
}
255
280
256
281
public void start (RubyFiber fiber , Thread javaThread ) {
@@ -394,6 +419,16 @@ public Object[] getArgs() {
394
419
395
420
}
396
421
422
+ private static class FiberSafepointMessage implements FiberMessage {
423
+ private final RubyFiber sendingFiber ;
424
+ private final SafepointAction action ;
425
+
426
+ private FiberSafepointMessage (RubyFiber sendingFiber , SafepointAction action ) {
427
+ this .sendingFiber = sendingFiber ;
428
+ this .action = action ;
429
+ }
430
+ }
431
+
397
432
/** Used to cleanup and terminate Fibers when the parent Thread dies. */
398
433
private static class FiberShutdownException extends TerminationException {
399
434
private static final long serialVersionUID = 1522270454305076317L ;
0 commit comments