Skip to content

Commit eea4056

Browse files
authored
Add refcounting and a zombie list to em_proxying_queue (#16524)
When work is added to a proxying queue, it sends a message to the target thread's JS event loop. Those messages contain references to the queues that sent them, so to prevent use-after-free bugs, we need to defer freeing the queues until after all their outstanding messages have been processed. Add a reference count to each queue to track how many outstanding messages there are and prevent the queues from being freed too early. Furthermore, a thread may not return to its event loop to process messages until after the thread has died. In that case, the thread cannot free the queue because it has no stack to call `free` on. To safely support this use case, also add a list of zombie queues, i.e. queues that have been destroyed but still have nonzero reference counts. Make the queue constructor traverse the zombie list and free any zombie queues whose reference counts have gone to zero. For more details, see the new documentation comment in proxying.c.
1 parent 3042235 commit eea4056

11 files changed

+247
-17
lines changed

emcc.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1519,7 +1519,6 @@ def setup_pthreads(target):
15191519
'__emscripten_thread_exit',
15201520
'__emscripten_thread_crashed',
15211521
'_emscripten_tls_init',
1522-
'_emscripten_current_thread_process_queued_calls',
15231522
'_pthread_self',
15241523
]
15251524
settings.EXPORTED_FUNCTIONS += worker_imports

src/library_pthread.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ var LibraryPThread = {
270270
if (cmd === 'processProxyingQueue') {
271271
// TODO: Must post message to main Emscripten thread in PROXY_TO_WORKER mode.
272272
_emscripten_proxy_execute_queue(d['queue']);
273+
// Decrement the ref count
274+
Atomics.sub(HEAP32, d['queue'] >> 2, 1);
273275
} else if (cmd === 'spawnThread') {
274276
spawnThread(d);
275277
} else if (cmd === 'cleanupThread') {
@@ -1054,9 +1056,13 @@ var LibraryPThread = {
10541056
setTimeout(() => {
10551057
// Only execute the queue if we have a live pthread runtime. We
10561058
// implement pthread_self to return 0 if there is no live runtime.
1059+
// TODO: Use `callUserCallback` to correctly handle unwinds, etc. once
1060+
// `runtimeExited` is correctly unset on workers.
10571061
if (_pthread_self()) {
10581062
_emscripten_proxy_execute_queue(queue);
10591063
}
1064+
// Decrement the ref count
1065+
Atomics.sub(HEAP32, queue >> 2, 1);
10601066
});
10611067
} else if (ENVIRONMENT_IS_PTHREAD) {
10621068
postMessage({'targetThread' : targetThreadId, 'cmd' : 'processProxyingQueue', 'queue' : queue});

src/worker.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,8 @@ self.onmessage = (e) => {
289289
if (Module['_pthread_self']()) { // If this thread is actually running?
290290
Module['_emscripten_proxy_execute_queue'](e.data.queue);
291291
}
292+
// Decrement the ref count
293+
Atomics.sub(HEAP32, e.data.queue >> 2, 1);
292294
} else {
293295
err('worker.js received unknown command ' + e.data.cmd);
294296
err(e.data);

system/lib/pthread/proxying.c

Lines changed: 110 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,49 @@
1515

1616
#define TASK_QUEUE_INITIAL_CAPACITY 128
1717

18+
// Proxy Queue Lifetime Management
19+
// -------------------------------
20+
//
21+
// Proxied tasks are executed either when the user manually calls
22+
// `emscripten_proxy_execute_queue` on the target thread or when the target
23+
// thread returns to the event loop. The queue does not know which execution
24+
// path will be used ahead of time when the work is proxied, so it must
25+
// conservatively send a message to the target thread's event loop in case the
26+
// user expects the event loop to drive the execution. These notifications
27+
// contain references to the queue that will be dereferenced when the target
28+
// thread returns to its event loop and receives the notification, even if the
29+
// user manages the execution of the queue themselves.
30+
//
31+
// To avoid use-after-free bugs, we cannot free a queue immediately when a user
32+
// calls `em_proxying_queue_destroy`; instead, we have to defer freeing the
33+
// queue until all of its outstanding notifications have been processed. We
34+
// defer freeing the queue using a reference counting scheme. Each time a
35+
// notification containing a reference to the queue is generated, we increase
36+
// the reference count and each time one of the notifications is received and
37+
// processed, we decrease the reference count. The queue can only be freed once
38+
// `em_proxying_queue_destroy` has been called and the reference count has
39+
// reached zero.
40+
//
41+
// But an extra complication is that the target thread may have died by the time
42+
// it gets back to its event loop to process its notifications. This can happen
43+
// when a user proxies some work to a thread, then calls
44+
// `emscripten_proxy_execute_queue` on that thread, then destroys the queue and
45+
// exits the thread. In that situation no work will be dropped, but the thread's
46+
// worker will still receive a notification and have to decrease the reference
47+
// count without a live runtime. Without a live runtime, there is no stack, so
48+
// the worker cannot safely free the queue at this point even if the refcount
49+
// goes to zero. We need a separate thread with a live runtime to perform the
50+
// free.
51+
//
52+
// To ensure that queues are eventually freed, we place destroyed queues in a
53+
// global "zombie list" where they wait for their refcounts to fall to zero. The
54+
// zombie list is scanned whenever a new queue is constructed and any of the
55+
// zombie queues with zero refcounts are freed. In principle the zombie list
56+
// could be scanned at any time, but the queue constructor is a nice place to do
57+
// it because scanning there is sufficient to keep the number of zombie queues
58+
// from growing without bound; creating a new zombie ultimately requires
59+
// creating a new queue.
60+
1861
extern int _emscripten_notify_proxying_queue(pthread_t target_thread,
1962
pthread_t curr_thread,
2063
pthread_t main_thread,
@@ -120,15 +163,25 @@ static task task_queue_dequeue(task_queue* tasks) {
120163
}
121164

122165
struct em_proxying_queue {
123-
// Protects all accesses to all task_queues.
166+
// The number of references to this queue that exist in JS event queues.
167+
// Decremented directly from JS, so this must be the first field.
168+
_Atomic int js_refcount;
169+
// Doubly linked list pointers for the zombie list.
170+
em_proxying_queue* zombie_prev;
171+
em_proxying_queue* zombie_next;
172+
// Protects all accesses to task_queues, size, and capacity.
124173
pthread_mutex_t mutex;
125174
// `size` task queues stored in an array of size `capacity`.
126175
task_queue* task_queues;
127176
int size;
128177
int capacity;
129178
};
130179

131-
static em_proxying_queue system_proxying_queue = {.mutex =
180+
// The system proxying queue.
181+
static em_proxying_queue system_proxying_queue = {.js_refcount = 0,
182+
.zombie_prev = NULL,
183+
.zombie_next = NULL,
184+
.mutex =
132185
PTHREAD_MUTEX_INITIALIZER,
133186
.task_queues = NULL,
134187
.size = 0,
@@ -138,12 +191,51 @@ em_proxying_queue* emscripten_proxy_get_system_queue(void) {
138191
return &system_proxying_queue;
139192
}
140193

194+
// The head of the zombie list. Its mutex protects access to the list and its
195+
// other fields are not used..
196+
static em_proxying_queue zombie_list_head = {.zombie_prev = &zombie_list_head,
197+
.zombie_next = &zombie_list_head,
198+
.mutex =
199+
PTHREAD_MUTEX_INITIALIZER};
200+
201+
static void em_proxying_queue_free(em_proxying_queue* q) {
202+
pthread_mutex_destroy(&q->mutex);
203+
for (int i = 0; i < q->size; i++) {
204+
task_queue_deinit(&q->task_queues[i]);
205+
}
206+
free(q->task_queues);
207+
free(q);
208+
}
209+
210+
static void cull_zombies() {
211+
pthread_mutex_lock(&zombie_list_head.mutex);
212+
em_proxying_queue* curr = zombie_list_head.zombie_next;
213+
while (curr != &zombie_list_head) {
214+
em_proxying_queue* next = curr->zombie_next;
215+
if (curr->js_refcount == 0) {
216+
// Remove the zombie from the list and free it.
217+
curr->zombie_prev->zombie_next = curr->zombie_next;
218+
curr->zombie_next->zombie_prev = curr->zombie_prev;
219+
em_proxying_queue_free(curr);
220+
}
221+
curr = next;
222+
}
223+
pthread_mutex_unlock(&zombie_list_head.mutex);
224+
}
225+
141226
em_proxying_queue* em_proxying_queue_create(void) {
227+
// Free any queue that has been destroyed and is safe to free.
228+
cull_zombies();
229+
230+
// Allocate the new queue.
142231
em_proxying_queue* q = malloc(sizeof(em_proxying_queue));
143232
if (q == NULL) {
144233
return NULL;
145234
}
146-
*q = (em_proxying_queue){.mutex = PTHREAD_MUTEX_INITIALIZER,
235+
*q = (em_proxying_queue){.js_refcount = 0,
236+
.zombie_prev = NULL,
237+
.zombie_next = NULL,
238+
.mutex = PTHREAD_MUTEX_INITIALIZER,
147239
.task_queues = NULL,
148240
.size = 0,
149241
.capacity = 0};
@@ -153,14 +245,21 @@ em_proxying_queue* em_proxying_queue_create(void) {
153245
void em_proxying_queue_destroy(em_proxying_queue* q) {
154246
assert(q != NULL);
155247
assert(q != &system_proxying_queue && "cannot destroy system proxying queue");
156-
// No need to acquire the lock; no one should be racing with the destruction
157-
// of the queue.
158-
pthread_mutex_destroy(&q->mutex);
159-
for (int i = 0; i < q->size; i++) {
160-
task_queue_deinit(&q->task_queues[i]);
248+
assert(!q->zombie_next && !q->zombie_prev &&
249+
"double freeing em_proxying_queue!");
250+
if (q->js_refcount == 0) {
251+
// No outstanding references to the queue, so we can go ahead and free it.
252+
em_proxying_queue_free(q);
253+
return;
161254
}
162-
free(q->task_queues);
163-
free(q);
255+
// Otherwise add the queue to the zombie list so that it will eventually be
256+
// freed safely.
257+
pthread_mutex_lock(&zombie_list_head.mutex);
258+
q->zombie_next = zombie_list_head.zombie_next;
259+
q->zombie_prev = &zombie_list_head;
260+
q->zombie_next->zombie_prev = q;
261+
q->zombie_prev->zombie_next = q;
262+
pthread_mutex_unlock(&zombie_list_head.mutex);
164263
}
165264

166265
// Not thread safe. Returns -1 if there are no tasks for the thread.
@@ -268,6 +367,7 @@ int emscripten_proxy_async(em_proxying_queue* q,
268367
// Otherwise, the target thread was already notified when the existing work
269368
// was enqueued so we don't need to notify it again.
270369
if (empty) {
370+
q->js_refcount++;
271371
_emscripten_notify_proxying_queue(
272372
target_thread, pthread_self(), emscripten_main_browser_thread_id(), q);
273373
}

tests/other/metadce/minimal_main_Oz_USE_PTHREADS_PROXY_TO_PTHREAD.exports

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ B
33
C
44
D
55
E
6-
F
76
o
87
p
98
q

tests/other/metadce/minimal_main_Oz_USE_PTHREADS_PROXY_TO_PTHREAD.funcs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ $__pthread_setcancelstate
1010
$__set_thread_state
1111
$__stdio_write
1212
$__timedwait
13-
$__wake
13+
$__wake.2
1414
$__wasi_syscall_ret
1515
$__wasm_call_ctors
1616
$__wasm_init_memory
@@ -24,7 +24,7 @@ $_main_thread
2424
$a_cas
2525
$a_cas_p.1
2626
$a_dec
27-
$a_fetch_add
27+
$a_fetch_add.1
2828
$a_inc
2929
$a_swap
3030
$add
@@ -35,7 +35,6 @@ $dlmemalign
3535
$do_dispatch_to_thread
3636
$em_queued_call_malloc
3737
$emscripten_async_run_in_main_thread
38-
$emscripten_current_thread_process_queued_calls
3938
$emscripten_futex_wait
4039
$emscripten_futex_wake
4140
$emscripten_main_thread_process_queued_calls
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
31848
1+
31638
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
18582
1+
18584
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
#include <assert.h>
2+
#include <emscripten/console.h>
3+
#include <emscripten/emscripten.h>
4+
#include <pthread.h>
5+
#include <stdbool.h>
6+
#include <unistd.h>
7+
8+
#include "proxying.h"
9+
10+
// The first two queues will be zombies and the next two will be created just to
11+
// cull the zombies.
12+
em_proxying_queue* queues[4];
13+
14+
#ifndef SANITIZER
15+
16+
// If we are not using sanitizers (which need to use their own allocators),
17+
// override free so we can track when queues are actually freed.
18+
19+
int queues_freed[4] = {};
20+
21+
extern void emscripten_builtin_free(void* mem);
22+
void __attribute__((noinline)) free(void* ptr) {
23+
for (int i = 0; i < 4; i++) {
24+
if (ptr && queues[i] == ptr) {
25+
queues_freed[i] = 1;
26+
queues[i] = NULL;
27+
break;
28+
}
29+
}
30+
emscripten_builtin_free(ptr);
31+
}
32+
33+
#endif // SANITIZER
34+
35+
_Atomic int should_execute = 0;
36+
_Atomic int executed[2] = {};
37+
38+
void task(void* arg) { *(_Atomic int*)arg = 1; }
39+
40+
void* execute_and_free_queue(void* arg) {
41+
// Wait until we are signaled to execute the queue.
42+
while (!should_execute) {
43+
}
44+
45+
// Execute the proxied work then free the empty queues.
46+
for (int i = 0; i < 2; i++) {
47+
emscripten_proxy_execute_queue(queues[i]);
48+
em_proxying_queue_destroy(queues[i]);
49+
}
50+
51+
// Exit with a live runtime so the queued work notification is received and we
52+
// try to execute the queue again, even though it has been destroyed.
53+
emscripten_exit_with_live_runtime();
54+
}
55+
56+
int main() {
57+
emscripten_console_log("start");
58+
for (int i = 0; i < 2; i++) {
59+
queues[i] = em_proxying_queue_create();
60+
assert(queues[i]);
61+
}
62+
63+
// Create the worker and send it tasks.
64+
pthread_t worker;
65+
pthread_create(&worker, NULL, execute_and_free_queue, NULL);
66+
for (int i = 0; i < 2; i++) {
67+
emscripten_proxy_async(queues[i], worker, task, &executed[i]);
68+
}
69+
should_execute = 1;
70+
71+
// Wait for the tasks to be executed.
72+
while (!executed[0] || !executed[1]) {
73+
}
74+
75+
// Break the queue abstraction to wait for the refcounts to be decreased.
76+
while (*(_Atomic int*)queues[0] || *(_Atomic int*)queues[1]) {
77+
}
78+
79+
#ifndef SANITIZER
80+
// Our zombies should not have been freed yet.
81+
assert(!queues_freed[0]);
82+
assert(!queues_freed[1]);
83+
#endif // SANITIZER
84+
85+
// Cull the zombies!
86+
queues[2] = em_proxying_queue_create();
87+
88+
#ifndef SANITIZER
89+
// Now they should be free.
90+
assert(queues_freed[0]);
91+
assert(queues_freed[1]);
92+
assert(!queues_freed[2]);
93+
#endif // SANITIZER
94+
95+
em_proxying_queue_destroy(queues[2]);
96+
97+
#ifndef SANITIZER
98+
// The new queue should have been immediately freed.
99+
assert(queues_freed[2]);
100+
#endif // SANITIZER
101+
102+
// Cull again, but this time there should be nothing to cull.
103+
queues[3] = em_proxying_queue_create();
104+
em_proxying_queue_destroy(queues[3]);
105+
106+
#ifndef SANITIZER
107+
// The new queue should have been immediately freed.
108+
assert(queues_freed[3]);
109+
#endif // SANITIZER
110+
111+
emscripten_console_log("done");
112+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
start
2+
done

0 commit comments

Comments
 (0)