Skip to content

Commit 240277e

Browse files
authored
Add C++ wrapper around the proxying.h API (#16468)
Add a thin C++ wrapper API to proxying.h that wraps the underlying C API in the same file. The C++ wrapper is defined only when using C++ at least as recent as C++11. The wrapper handles creating and destroying the underlying em_proxying_queue objects automatically, defines proxying methods that take `std::function` rather than void*, and wraps `em_proxying_ctx` in a class with a `finish()` method.
1 parent bbb6bf9 commit 240277e

File tree

5 files changed

+266
-6
lines changed

5 files changed

+266
-6
lines changed

system/lib/pthread/proxying.h

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ void emscripten_proxy_execute_queue(em_proxying_queue* q);
4343
// of the task.
4444
typedef struct em_proxying_ctx em_proxying_ctx;
4545

46-
// Signal the end of a proxied task.
46+
// Signal the end of a task proxied with `emscripten_proxy_sync_with_ctx`.
4747
void emscripten_proxy_finish(em_proxying_ctx* ctx);
4848

4949
// Enqueue `func` on the given queue and thread and return immediately. Returns
@@ -64,13 +64,103 @@ int emscripten_proxy_sync(em_proxying_queue* q,
6464

6565
// Enqueue `func` on the given queue and thread and wait for it to be executed
6666
// and for the task to be marked finished with `emscripten_proxying_finish`
67-
// before returning. Returns 1 if the task was successfully completed and 0
68-
// otherwise.
67+
// before returning. `func` need not call `emscripten_proxying_finish` itself;
68+
// it could instead store the context pointer and call
69+
// `emscripten_proxying_finish` at an arbitrary later time. Returns 1 if the
70+
// task was successfully completed and 0 otherwise.
6971
int emscripten_proxy_sync_with_ctx(em_proxying_queue* q,
7072
pthread_t target_thread,
7173
void (*func)(em_proxying_ctx*, void*),
7274
void* arg);
7375

7476
#ifdef __cplusplus
75-
}
76-
#endif
77+
} // extern "C"
78+
79+
#if __cplusplus < 201103L
80+
#warning "C++ ProxyingQueue support requires building with -std=c++11 or newer!"
81+
#else
82+
83+
#include <functional>
84+
#include <thread>
85+
#include <utility>
86+
87+
namespace emscripten {
88+
89+
// A thin C++ wrapper around the underlying C API.
90+
class ProxyingQueue {
91+
em_proxying_queue* queue = em_proxying_queue_create();
92+
93+
static void runAndFree(void* arg) {
94+
auto f = (std::function<void()>*)arg;
95+
(*f)();
96+
delete f;
97+
}
98+
99+
static void run(void* arg) {
100+
auto f = *(std::function<void()>*)arg;
101+
f();
102+
}
103+
104+
static void runWithCtx(em_proxying_ctx* ctx, void* arg) {
105+
auto f = *(std::function<void(ProxyingCtx)>*)arg;
106+
f(ProxyingCtx{ctx});
107+
}
108+
109+
public:
110+
// ProxyingQueue can be moved but not copied. It is not valid to call any
111+
// methods on ProxyingQueues that have been moved out of.
112+
ProxyingQueue() = default;
113+
ProxyingQueue& operator=(const ProxyingQueue&) = delete;
114+
ProxyingQueue& operator=(ProxyingQueue&& other) {
115+
if (queue) {
116+
em_proxying_queue_destroy(queue);
117+
}
118+
queue = other.queue;
119+
other.queue = nullptr;
120+
return *this;
121+
}
122+
123+
ProxyingQueue(const ProxyingQueue&) = delete;
124+
ProxyingQueue(ProxyingQueue&& other) : queue(nullptr) {
125+
*this = std::move(other);
126+
}
127+
128+
~ProxyingQueue() {
129+
if (queue) {
130+
em_proxying_queue_destroy(queue);
131+
}
132+
}
133+
134+
// Simple wrapper around `em_proxying_ctx*` providing a `finish` method as an
135+
// alternative to `emscripten_proxy_finish`.
136+
struct ProxyingCtx {
137+
em_proxying_ctx* ctx;
138+
139+
ProxyingCtx(em_proxying_ctx* ctx) : ctx(ctx) {}
140+
void finish() { emscripten_proxy_finish(ctx); }
141+
};
142+
143+
void execute() { emscripten_proxy_execute_queue(queue); }
144+
145+
// Return true if the work was successfully enqueued and false otherwise.
146+
// Refer to the corresponding C API documentation.
147+
bool proxyAsync(pthread_t target, std::function<void()>&& func) {
148+
std::function<void()>* arg = new std::function<void()>(std::move(func));
149+
return emscripten_proxy_async(queue, target, runAndFree, (void*)arg);
150+
}
151+
152+
bool proxySync(const pthread_t target, const std::function<void()>& func) {
153+
return emscripten_proxy_sync(queue, target, run, (void*)&func);
154+
}
155+
156+
bool proxySyncWithCtx(const pthread_t target,
157+
const std::function<void(ProxyingCtx)>& func) {
158+
return emscripten_proxy_sync_with_ctx(
159+
queue, target, runWithCtx, (void*)&func);
160+
}
161+
};
162+
163+
} // namespace emscripten
164+
165+
#endif // __cplusplus < 201103L
166+
#endif // __cplusplus

tests/pthread/test_pthread_proxying.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,5 +351,4 @@ int main(int argc, char* argv[]) {
351351
test_proxying_queue_growth();
352352

353353
printf("done\n");
354-
emscripten_force_exit(0);
355354
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
#include <cassert>
2+
#include <condition_variable>
3+
#include <iostream>
4+
#include <sched.h>
5+
6+
#include "proxying.h"
7+
8+
using namespace emscripten;
9+
10+
// The worker threads we will use. `looper` sits in a loop, continuously
11+
// processing work as it becomes available, while `returner` returns to the JS
12+
// event loop each time it processes work.
13+
std::thread looper;
14+
std::thread returner;
15+
16+
// The queue used to send work to both `looper` and `returner`.
17+
ProxyingQueue queue;
18+
19+
// Whether `looper` should exit.
20+
std::atomic<bool> should_quit{false};
21+
22+
// Whether `returner` has spun up.
23+
std::atomic<bool> has_begun{false};
24+
25+
void looper_main() {
26+
while (!should_quit) {
27+
queue.execute();
28+
sched_yield();
29+
}
30+
}
31+
32+
void returner_main() {
33+
has_begun = 1;
34+
emscripten_exit_with_live_runtime();
35+
}
36+
37+
void test_proxy_async() {
38+
std::cout << "Testing async proxying\n";
39+
40+
int i = 0;
41+
42+
std::mutex mutex;
43+
std::condition_variable cond;
44+
std::thread::id executor;
45+
46+
// Proxy to ourselves.
47+
queue.proxyAsync(pthread_self(), [&]() {
48+
i = 1;
49+
executor = std::this_thread::get_id();
50+
});
51+
assert(i == 0);
52+
queue.execute();
53+
assert(i == 1);
54+
assert(executor == std::this_thread::get_id());
55+
56+
// Proxy to looper.
57+
{
58+
queue.proxyAsync(looper.native_handle(), [&]() {
59+
i = 2;
60+
executor = std::this_thread::get_id();
61+
cond.notify_one();
62+
});
63+
std::unique_lock<std::mutex> lock(mutex);
64+
cond.wait(lock, [&]() { return i == 2; });
65+
assert(executor == looper.get_id());
66+
}
67+
68+
// Proxy to returner.
69+
{
70+
queue.proxyAsync(returner.native_handle(), [&]() {
71+
i = 3;
72+
executor = std::this_thread::get_id();
73+
cond.notify_one();
74+
});
75+
std::unique_lock<std::mutex> lock(mutex);
76+
cond.wait(lock, [&]() { return i == 3; });
77+
assert(executor == returner.get_id());
78+
}
79+
}
80+
81+
void test_proxy_sync() {
82+
std::cout << "Testing sync proxying\n";
83+
84+
int i = 0;
85+
std::thread::id executor;
86+
87+
// Proxy to looper.
88+
{
89+
queue.proxySync(looper.native_handle(), [&]() {
90+
i = 2;
91+
executor = std::this_thread::get_id();
92+
});
93+
assert(i == 2);
94+
assert(executor == looper.get_id());
95+
}
96+
97+
// Proxy to returner.
98+
{
99+
queue.proxySync(returner.native_handle(), [&]() {
100+
i = 3;
101+
executor = std::this_thread::get_id();
102+
});
103+
assert(i == 3);
104+
assert(executor == returner.get_id());
105+
}
106+
}
107+
108+
void test_proxy_sync_with_ctx(void) {
109+
std::cout << "Testing sync proxying\n";
110+
111+
int i = 0;
112+
std::thread::id executor;
113+
114+
// Proxy to looper.
115+
{
116+
queue.proxySyncWithCtx(looper.native_handle(), [&](auto ctx) {
117+
i = 2;
118+
executor = std::this_thread::get_id();
119+
ctx.finish();
120+
});
121+
assert(i == 2);
122+
assert(executor == looper.get_id());
123+
}
124+
125+
// Proxy to returner.
126+
{
127+
queue.proxySyncWithCtx(returner.native_handle(), [&](auto ctx) {
128+
i = 3;
129+
executor = std::this_thread::get_id();
130+
auto finish = (void(*)(void*))emscripten_proxy_finish;
131+
emscripten_async_call(finish, ctx.ctx, 0);
132+
});
133+
assert(i == 3);
134+
assert(executor == returner.get_id());
135+
}
136+
}
137+
138+
int main(int argc, char* argv[]) {
139+
looper = std::thread(looper_main);
140+
returner = std::thread(returner_main);
141+
142+
// `returner` can't process its queue until it starts up.
143+
while (!has_begun) {
144+
sched_yield();
145+
}
146+
147+
test_proxy_async();
148+
test_proxy_sync();
149+
test_proxy_sync_with_ctx();
150+
151+
should_quit = true;
152+
looper.join();
153+
154+
pthread_cancel(returner.native_handle());
155+
returner.join();
156+
157+
std::cout << "done\n";
158+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Testing async proxying
2+
Testing sync proxying
3+
Testing sync proxying
4+
done

tests/test_core.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2578,6 +2578,15 @@ def test_pthread_proxying(self):
25782578
self.do_run_in_out_file_test('pthread/test_pthread_proxying.c',
25792579
emcc_args=args, interleaved_output=False)
25802580

2581+
@node_pthreads
2582+
def test_pthread_proxying_cpp(self):
2583+
self.set_setting('EXIT_RUNTIME')
2584+
self.set_setting('PROXY_TO_PTHREAD')
2585+
self.set_setting('INITIAL_MEMORY=32mb')
2586+
args = [f'-I{path_from_root("system/lib/pthread")}']
2587+
self.do_run_in_out_file_test('pthread/test_pthread_proxying_cpp.cpp',
2588+
emcc_args=args, interleaved_output=False)
2589+
25812590
@node_pthreads
25822591
def test_pthread_proxying_dropped_work(self):
25832592
self.set_setting('EXIT_RUNTIME')

0 commit comments

Comments
 (0)