File tree Expand file tree Collapse file tree 4 files changed +514
-10
lines changed Expand file tree Collapse file tree 4 files changed +514
-10
lines changed Original file line number Diff line number Diff line change @@ -148,18 +148,32 @@ enum class ResolverContext
148
148
NotifyUnsubscribe,
149
149
};
150
150
151
- // Resume coroutine execution on a worker thread.
151
+ // Resume coroutine execution on a new worker thread any time co_await is called. This emulates the
152
+ // behavior of std::async when passing std::launch::async.
152
153
struct await_worker_thread : coro::suspend_always
153
154
{
154
- void await_suspend (coro::coroutine_handle<> h) const
155
- {
156
- std::thread (
157
- [](coro::coroutine_handle<>&& h) noexcept {
158
- h.resume ();
159
- },
160
- std::move (h))
161
- .detach ();
162
- }
155
+ GRAPHQLSERVICE_EXPORT void await_suspend (coro::coroutine_handle<> h) const ;
156
+ };
157
+
158
+ // Queue coroutine execution on a single dedicated worker thread any time co_await is called from
159
+ // the thread which created it.
160
+ struct await_worker_queue : coro::suspend_always
161
+ {
162
+ GRAPHQLSERVICE_EXPORT await_worker_queue ();
163
+ GRAPHQLSERVICE_EXPORT ~await_worker_queue ();
164
+
165
+ GRAPHQLSERVICE_EXPORT bool await_ready () const ;
166
+ GRAPHQLSERVICE_EXPORT void await_suspend (coro::coroutine_handle<> h);
167
+
168
+ private:
169
+ void resumePending ();
170
+
171
+ const std::thread::id _startId;
172
+ std::mutex _mutex {};
173
+ std::condition_variable _cv {};
174
+ std::list<coro::coroutine_handle<>> _pending {};
175
+ bool _shutdown = false ;
176
+ std::thread _worker;
163
177
};
164
178
165
179
// Type-erased awaitable.
Original file line number Diff line number Diff line change @@ -164,6 +164,72 @@ response::Value schema_exception::getErrors()
164
164
return buildErrorValues (std::move (_structuredErrors));
165
165
}
166
166
167
+ void await_worker_thread::await_suspend (coro::coroutine_handle<> h) const
168
+ {
169
+ std::thread (
170
+ [](coro::coroutine_handle<>&& h) {
171
+ h.resume ();
172
+ },
173
+ std::move (h))
174
+ .detach ();
175
+ }
176
+
177
+ await_worker_queue::await_worker_queue ()
178
+ : _startId { std::this_thread::get_id () }
179
+ , _worker { [this ]() {
180
+ resumePending ();
181
+ } }
182
+ {
183
+ }
184
+
185
+ await_worker_queue::~await_worker_queue ()
186
+ {
187
+ std::unique_lock lock { _mutex };
188
+
189
+ _shutdown = true ;
190
+ lock.unlock ();
191
+ _cv.notify_one ();
192
+
193
+ _worker.join ();
194
+ }
195
+
196
+ bool await_worker_queue::await_ready () const
197
+ {
198
+ return std::this_thread::get_id () != _startId;
199
+ }
200
+
201
+ void await_worker_queue::await_suspend (coro::coroutine_handle<> h)
202
+ {
203
+ std::unique_lock lock { _mutex };
204
+
205
+ _pending.push_back (std::move (h));
206
+ lock.unlock ();
207
+ _cv.notify_one ();
208
+ }
209
+
210
+ void await_worker_queue::resumePending ()
211
+ {
212
+ std::unique_lock lock { _mutex };
213
+
214
+ while (!_shutdown)
215
+ {
216
+ _cv.wait (lock, [this ]() {
217
+ return _shutdown || !_pending.empty ();
218
+ });
219
+
220
+ auto pending = std::move (_pending);
221
+
222
+ lock.unlock ();
223
+
224
+ for (auto h : pending)
225
+ {
226
+ h.resume ();
227
+ }
228
+
229
+ lock.lock ();
230
+ }
231
+ }
232
+
167
233
FieldParams::FieldParams (SelectionSetParams&& selectionSetParams, Directives directives)
168
234
: SelectionSetParams(std::move(selectionSetParams))
169
235
, fieldDirectives(std::move(directives))
Original file line number Diff line number Diff line change @@ -23,6 +23,14 @@ target_link_libraries(today_tests PRIVATE
23
23
add_bigobj_flag (today_tests )
24
24
gtest_add_tests (TARGET today_tests )
25
25
26
+ add_executable (coroutine_tests CoroutineTests.cpp )
27
+ target_link_libraries (coroutine_tests PRIVATE
28
+ todaygraphql
29
+ graphqljson
30
+ GTest::GTest
31
+ GTest::Main )
32
+ gtest_add_tests (TARGET coroutine_tests )
33
+
26
34
add_executable (client_tests ClientTests.cpp )
27
35
target_link_libraries (client_tests PRIVATE
28
36
todaygraphql
You can’t perform that action at this time.
0 commit comments