Skip to content

Commit 8efa3fd

Browse files
committed
Refactor reference processing services to use a single queue/thread.
PullRequest: truffleruby/517
2 parents ff90570 + cc79e36 commit 8efa3fd

File tree

4 files changed

+98
-93
lines changed

4 files changed

+98
-93
lines changed

src/main/java/org/truffleruby/RubyContext.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.truffleruby.core.FinalizationService;
3030
import org.truffleruby.core.Hashing;
3131
import org.truffleruby.core.MarkingService;
32+
import org.truffleruby.core.ReferenceProcessingService.ReferenceProcessor;
3233
import org.truffleruby.core.array.ArrayOperations;
3334
import org.truffleruby.core.encoding.EncodingManager;
3435
import org.truffleruby.core.exception.CoreExceptions;
@@ -107,8 +108,9 @@ public class RubyContext {
107108
private final CodeLoader codeLoader = new CodeLoader(this);
108109
private final FeatureLoader featureLoader = new FeatureLoader(this);
109110
private final TraceManager traceManager;
110-
private final FinalizationService finalizationService = new FinalizationService(this);
111-
private final MarkingService markingService = new MarkingService(this);
111+
private final ReferenceProcessor referenceProcessor = new ReferenceProcessor(this);
112+
private final FinalizationService finalizationService = new FinalizationService(this, referenceProcessor);
113+
private final MarkingService markingService = new MarkingService(this, referenceProcessor);
112114
private final ObjectSpaceManager objectSpaceManager = new ObjectSpaceManager(this);
113115
private final SharedObjects sharedObjects = new SharedObjects(this);
114116
private final AtExitManager atExitManager = new AtExitManager(this);

src/main/java/org/truffleruby/core/FinalizationService.java

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import java.util.LinkedList;
1717

1818
import org.truffleruby.RubyContext;
19-
import org.truffleruby.core.thread.ThreadManager;
2019

2120
import com.oracle.truffle.api.object.DynamicObject;
2221

@@ -58,13 +57,15 @@ public static class FinalizerReference extends PhantomReference<Object> implemen
5857
* {@link FinalizationService} monitor, to avoid concurrent access.
5958
*/
6059
private final Deque<Finalizer> finalizers = new LinkedList<>();
60+
private final FinalizationService service;
6161

6262
/** The doubly-linked list of FinalizerReference, needed to collect finalizer Procs for ObjectSpace. */
6363
FinalizerReference next = null;
6464
FinalizerReference prev = null;
6565

66-
private FinalizerReference(Object object, ReferenceQueue<? super Object> queue) {
66+
private FinalizerReference(Object object, ReferenceQueue<? super Object> queue, FinalizationService service) {
6767
super(object, queue);
68+
this.service = service;
6869
}
6970

7071
public FinalizerReference getPrevious() {
@@ -110,52 +111,35 @@ private void collectRoots(Collection<DynamicObject> roots) {
110111
}
111112
}
112113
}
114+
115+
public ReferenceProcessingService<FinalizerReference> service() {
116+
return service;
117+
}
113118
}
114119

115120
/** The finalizer Ruby thread, spawned lazily. */
116-
public FinalizationService(RubyContext context) {
117-
super(context);
121+
public FinalizationService(RubyContext context, ReferenceProcessor referenceProcessor) {
122+
super(context, referenceProcessor);
118123
}
119124

120125
public synchronized FinalizerReference addFinalizer(Object object, FinalizerReference finalizerReference, Class<?> owner, Runnable action, DynamicObject root) {
121126

122127
if (finalizerReference == null) {
123-
finalizerReference = new FinalizerReference(object, processingQueue);
128+
finalizerReference = new FinalizerReference(object, referenceProcessor.processingQueue, this);
124129
add(finalizerReference);
125130
}
126131

127132
finalizerReference.addFinalizer(owner, action, root);
128133

129-
processReferenceQueue();
134+
referenceProcessor.processReferenceQueue();
130135
return finalizerReference;
131136
}
132137

133138
@Override
134-
protected void createProcessingThread() {
135-
final ThreadManager threadManager = context.getThreadManager();
136-
processingThread = threadManager.createBootThread(getThreadName());
137-
context.send(processingThread, "internal_thread_initialize");
138-
139-
threadManager.initialize(processingThread, null, getThreadName(), () -> {
140-
while (true) {
141-
final FinalizerReference finalizerReference =
142-
(FinalizerReference) threadManager.runUntilResult(null, processingQueue::remove);
143-
144-
processReference(finalizerReference);
145-
}
146-
});
147-
}
148-
149-
@Override
150-
protected String getThreadName() {
151-
return "finalizer";
152-
}
153-
154-
@Override
155-
protected void processReference(FinalizerReference finalizerReference) {
139+
protected void processReference(ProcessingReference<?> finalizerReference) {
156140
super.processReference(finalizerReference);
157141

158-
runCatchingErrors(this::processReferenceInternal, finalizerReference);
142+
runCatchingErrors(this::processReferenceInternal, (FinalizerReference) finalizerReference);
159143
}
160144

161145
protected void processReferenceInternal(FinalizerReference finalizerReference) {

src/main/java/org/truffleruby/core/MarkingService.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,14 @@ public static interface MarkerAction {
4848
public static class MarkerReference extends WeakReference<DynamicObject> implements ReferenceProcessingService.ProcessingReference<MarkerReference> {
4949

5050
private final MarkerAction action;
51+
private final MarkingService service;
5152
private MarkerReference next = null;
5253
private MarkerReference prev = null;
5354

54-
private MarkerReference(DynamicObject object, ReferenceQueue<? super Object> queue, MarkerAction action) {
55+
private MarkerReference(DynamicObject object, ReferenceQueue<? super Object> queue, MarkerAction action, MarkingService service) {
5556
super(object, queue);
5657
this.action = action;
58+
this.service = service;
5759
}
5860

5961
public MarkerReference getPrevious() {
@@ -71,6 +73,10 @@ public MarkerReference getNext() {
7173
public void setNext(MarkerReference next) {
7274
this.next = next;
7375
}
76+
77+
public ReferenceProcessingService<MarkerReference> service() {
78+
return service;
79+
}
7480
}
7581

7682
private static final int KEPT_COUNT_SIZE = 10_000;
@@ -81,8 +87,8 @@ public void setNext(MarkerReference next) {
8187

8288
private int counter = 0;
8389

84-
public MarkingService(RubyContext context) {
85-
super(context);
90+
public MarkingService(RubyContext context, ReferenceProcessor referenceProcessor) {
91+
super(context, referenceProcessor);
8692
}
8793

8894
public void keepObject(Object object) {
@@ -131,8 +137,7 @@ private synchronized void runAllMarkers() {
131137
}
132138

133139
public void addMarker(DynamicObject object, MarkerAction action) {
134-
add(new MarkerReference(object, processingQueue, action));
135-
processReferenceQueue();
140+
add(new MarkerReference(object, referenceProcessor.processingQueue, action, this));
136141
}
137142

138143
private void runMarker(MarkerReference markerReference) {
@@ -148,9 +153,4 @@ private void runMarkerInternal(MarkerReference markerReference) {
148153
}
149154
}
150155
}
151-
152-
@Override
153-
protected String getThreadName() {
154-
return "marker-finalizer";
155-
}
156156
}

src/main/java/org/truffleruby/core/ReferenceProcessingService.java

Lines changed: 71 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,50 +20,95 @@ public static interface ProcessingReference<R extends ProcessingReference<R>> {
2020
public R getNext();
2121

2222
public void setNext(R next);
23+
24+
public ReferenceProcessingService<R> service();
2325
}
2426

25-
/** The head of a doubly-linked list of FinalizerReference, needed to collect finalizer Procs for ObjectSpace. */
26-
private R first = null;
27+
public static class ReferenceProcessor {
28+
protected final ReferenceQueue<Object> processingQueue = new ReferenceQueue<>();
2729

28-
protected final ReferenceQueue<Object> processingQueue = new ReferenceQueue<>();
30+
protected DynamicObject processingThread;
31+
protected final RubyContext context;
2932

30-
protected DynamicObject processingThread;
31-
protected final RubyContext context;
33+
public ReferenceProcessor(RubyContext context) {
34+
this.context = context;
35+
}
3236

33-
public ReferenceProcessingService(RubyContext context) {
34-
this.context = context;
35-
}
37+
protected void processReferenceQueue() {
38+
if (context.getOptions().SINGLE_THREADED) {
39+
40+
drainReferenceQueue();
41+
42+
} else {
3643

37-
protected final void drainReferenceQueue() {
38-
while (true) {
39-
@SuppressWarnings("unchecked")
40-
final R reference = (R) processingQueue.poll();
44+
/*
45+
* We can't create a new thread while the context is initializing or finalizing, as
46+
* the polyglot API locks on creating new threads, and some core loading does things
47+
* such as stat files which could allocate memory that is marked to be automatically
48+
* freed and so would want to start the finalization thread. So don't start the
49+
* finalization thread if we are initializing. We will rely on some other finalizer
50+
* to be created to ever free this memory allocated during startup, but that's a
51+
* reasonable assumption and a low risk of leaking a tiny number of bytes if it
52+
* doesn't hold.
53+
*/
54+
55+
if (processingThread == null && !context.isPreInitializing() && context.isInitialized() && !context.isFinalizing()) {
56+
createProcessingThread();
57+
}
4158

42-
if (reference == null) {
43-
break;
4459
}
60+
}
61+
62+
protected void createProcessingThread() {
63+
final ThreadManager threadManager = context.getThreadManager();
64+
processingThread = threadManager.createBootThread(threadName());
65+
context.send(processingThread, "internal_thread_initialize");
4566

46-
processReference(reference);
67+
threadManager.initialize(processingThread, null, threadName(), () -> {
68+
while (true) {
69+
final ProcessingReference<?> reference = (ProcessingReference<?>) threadManager.runUntilResult(null, processingQueue::remove);
70+
71+
reference.service().processReference(reference);
72+
}
73+
});
4774
}
48-
}
4975

50-
protected void createProcessingThread() {
51-
final ThreadManager threadManager = context.getThreadManager();
52-
processingThread = threadManager.createBootThread(getThreadName());
53-
context.send(processingThread, "internal_thread_initialize");
76+
protected final String threadName() {
77+
return "Ruby-reference-processor";
78+
}
5479

55-
threadManager.initialize(processingThread, null, getThreadName(), () -> {
80+
protected final void drainReferenceQueue() {
5681
while (true) {
5782
@SuppressWarnings("unchecked")
58-
final R reference = (R) threadManager.runUntilResult(null, processingQueue::remove);
83+
final ProcessingReference<?> reference = (ProcessingReference<?>) processingQueue.poll();
84+
85+
if (reference == null) {
86+
break;
87+
}
5988

60-
processReference(reference);
89+
reference.service().processReference(reference);
6190
}
62-
});
91+
}
92+
6393
}
6494

65-
protected void processReference(R reference) {
66-
remove(reference);
95+
/**
96+
* The head of a doubly-linked list of FinalizerReference, needed to collect finalizer Procs for
97+
* ObjectSpace.
98+
*/
99+
private R first = null;
100+
101+
protected final ReferenceProcessor referenceProcessor;
102+
protected final RubyContext context;
103+
104+
public ReferenceProcessingService(RubyContext context, ReferenceProcessor referenceProcessor) {
105+
this.context = context;
106+
this.referenceProcessor = referenceProcessor;
107+
}
108+
109+
@SuppressWarnings("unchecked")
110+
protected void processReference(ProcessingReference<?> reference) {
111+
remove((R) reference);
67112
}
68113

69114
protected void runCatchingErrors(Consumer<R> action, R reference) {
@@ -81,32 +126,6 @@ protected void runCatchingErrors(Consumer<R> action, R reference) {
81126
}
82127
}
83128

84-
protected abstract String getThreadName();
85-
86-
protected void processReferenceQueue() {
87-
if (context.getOptions().SINGLE_THREADED) {
88-
89-
drainReferenceQueue();
90-
91-
} else {
92-
93-
/*
94-
* We can't create a new thread while the context is initializing or finalizing, as the
95-
* polyglot API locks on creating new threads, and some core loading does things such as
96-
* stat files which could allocate memory that is marked to be automatically freed and
97-
* so would want to start the finalization thread. So don't start the finalization
98-
* thread if we are initializing. We will rely on some other finalizer to be created to
99-
* ever free this memory allocated during startup, but that's a reasonable assumption
100-
* and a low risk of leaking a tiny number of bytes if it doesn't hold.
101-
*/
102-
103-
if (processingThread == null && !context.isPreInitializing() && context.isInitialized() && !context.isFinalizing()) {
104-
createProcessingThread();
105-
}
106-
107-
}
108-
}
109-
110129
protected synchronized void remove(R ref) {
111130
if (ref.getNext() == ref) {
112131
// Already removed.

0 commit comments

Comments
 (0)