Skip to content

Commit b3c662e

Browse files
authored
Merge pull request #204 from wravery/main
Make Request/Operations methods thread-safe and test custom awaitables
2 parents 5e01ddb + 45574c2 commit b3c662e

File tree

142 files changed

+949
-418
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

142 files changed

+949
-418
lines changed

doc/awaitable.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ specifically a type-erased `graphql::service::await_async` class in
88
[GraphQLService.h](../include/graphqlservice/GraphQLService.h):
99
```cpp
1010
// Type-erased awaitable.
11-
class await_async : public coro::suspend_always
11+
class await_async final
1212
{
1313
private:
1414
struct Concept
@@ -31,17 +31,17 @@ public:
3131

3232
// Default to immediate synchronous execution.
3333
await_async()
34-
: _pimpl { std::static_pointer_cast<Concept>(
34+
: _pimpl { std::static_pointer_cast<const Concept>(
3535
std::make_shared<Model<coro::suspend_never>>(std::make_shared<coro::suspend_never>())) }
3636
{
3737
}
3838

3939
// Implicitly convert a std::launch parameter used with std::async to an awaitable.
4040
await_async(std::launch launch)
4141
: _pimpl { ((launch & std::launch::async) == std::launch::async)
42-
? std::static_pointer_cast<Concept>(std::make_shared<Model<await_worker_thread>>(
42+
? std::static_pointer_cast<const Concept>(std::make_shared<Model<await_worker_thread>>(
4343
std::make_shared<await_worker_thread>()))
44-
: std::static_pointer_cast<Concept>(std::make_shared<Model<coro::suspend_never>>(
44+
: std::static_pointer_cast<const Concept>(std::make_shared<Model<coro::suspend_never>>(
4545
std::make_shared<coro::suspend_never>())) }
4646
{
4747
}

doc/json.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ to some other output mechanism, without building a single string buffer for the
4646
document in memory. For example, you might use this to write directly to a buffered IPC pipe
4747
or network connection:
4848
```cpp
49-
class Writer
49+
class Writer final
5050
{
5151
private:
5252
struct Concept
@@ -71,7 +71,7 @@ private:
7171
public:
7272
template <class T>
7373
Writer(std::unique_ptr<T> writer)
74-
: _concept { std::static_pointer_cast<Concept>(
74+
: _concept { std::static_pointer_cast<const Concept>(
7575
std::make_shared<Model<T>>(std::move(writer))) }
7676
{
7777
}

include/graphqlservice/GraphQLResponse.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ GRAPHQLRESPONSE_EXPORT IdType Value::release<IdType>();
269269

270270
using AwaitableValue = internal::Awaitable<Value>;
271271

272-
class Writer
272+
class Writer final
273273
{
274274
private:
275275
struct Concept
@@ -357,7 +357,7 @@ class Writer
357357
public:
358358
template <class T>
359359
Writer(std::unique_ptr<T> writer)
360-
: _concept { std::static_pointer_cast<Concept>(
360+
: _concept { std::static_pointer_cast<const Concept>(
361361
std::make_shared<Model<T>>(std::move(writer))) }
362362
{
363363
}

include/graphqlservice/GraphQLService.h

Lines changed: 41 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -148,22 +148,36 @@ enum class ResolverContext
148148
NotifyUnsubscribe,
149149
};
150150

151-
// Resume coroutine execution on a worker thread.
151+
// Resume coroutine execution on a new worker thread any time co_await is called. This emulates the
152+
// behavior of std::async when passing std::launch::async.
152153
struct await_worker_thread : coro::suspend_always
153154
{
154-
void await_suspend(coro::coroutine_handle<> h) const
155-
{
156-
std::thread(
157-
[](coro::coroutine_handle<>&& h) noexcept {
158-
h.resume();
159-
},
160-
std::move(h))
161-
.detach();
162-
}
155+
GRAPHQLSERVICE_EXPORT void await_suspend(coro::coroutine_handle<> h) const;
156+
};
157+
158+
// Queue coroutine execution on a single dedicated worker thread any time co_await is called from
159+
// the thread which created it.
160+
struct await_worker_queue : coro::suspend_always
161+
{
162+
GRAPHQLSERVICE_EXPORT await_worker_queue();
163+
GRAPHQLSERVICE_EXPORT ~await_worker_queue();
164+
165+
GRAPHQLSERVICE_EXPORT bool await_ready() const;
166+
GRAPHQLSERVICE_EXPORT void await_suspend(coro::coroutine_handle<> h);
167+
168+
private:
169+
void resumePending();
170+
171+
const std::thread::id _startId;
172+
std::mutex _mutex {};
173+
std::condition_variable _cv {};
174+
std::list<coro::coroutine_handle<>> _pending {};
175+
bool _shutdown = false;
176+
std::thread _worker;
163177
};
164178

165179
// Type-erased awaitable.
166-
class await_async : public coro::suspend_always
180+
class await_async final
167181
{
168182
private:
169183
struct Concept
@@ -202,7 +216,7 @@ class await_async : public coro::suspend_always
202216
std::shared_ptr<T> _pimpl;
203217
};
204218

205-
const std::shared_ptr<Concept> _pimpl;
219+
const std::shared_ptr<const Concept> _pimpl;
206220

207221
public:
208222
// Type-erased explicit constructor for a custom awaitable.
@@ -213,36 +227,14 @@ class await_async : public coro::suspend_always
213227
}
214228

215229
// Default to immediate synchronous execution.
216-
await_async()
217-
: _pimpl { std::static_pointer_cast<Concept>(
218-
std::make_shared<Model<coro::suspend_never>>(std::make_shared<coro::suspend_never>())) }
219-
{
220-
}
230+
GRAPHQLSERVICE_EXPORT await_async();
221231

222232
// Implicitly convert a std::launch parameter used with std::async to an awaitable.
223-
await_async(std::launch launch)
224-
: _pimpl { ((launch & std::launch::async) == std::launch::async)
225-
? std::static_pointer_cast<Concept>(std::make_shared<Model<await_worker_thread>>(
226-
std::make_shared<await_worker_thread>()))
227-
: std::static_pointer_cast<Concept>(std::make_shared<Model<coro::suspend_never>>(
228-
std::make_shared<coro::suspend_never>())) }
229-
{
230-
}
231-
232-
bool await_ready() const
233-
{
234-
return _pimpl->await_ready();
235-
}
233+
GRAPHQLSERVICE_EXPORT await_async(std::launch launch);
236234

237-
void await_suspend(coro::coroutine_handle<> h) const
238-
{
239-
_pimpl->await_suspend(std::move(h));
240-
}
241-
242-
void await_resume() const
243-
{
244-
_pimpl->await_resume();
245-
}
235+
GRAPHQLSERVICE_EXPORT bool await_ready() const;
236+
GRAPHQLSERVICE_EXPORT void await_suspend(coro::coroutine_handle<> h) const;
237+
GRAPHQLSERVICE_EXPORT void await_resume() const;
246238
};
247239

248240
// Directive order matters, and some of them are repeatable. So rather than passing them in a
@@ -826,7 +818,7 @@ struct ModifiedResult
826818
typename std::conditional_t<std::is_base_of_v<Object, U>, std::shared_ptr<U>, U>;
827819

828820
using future_type = typename std::conditional_t<std::is_base_of_v<Object, U>,
829-
AwaitableObject<std::shared_ptr<Object>>, AwaitableScalar<type>>;
821+
AwaitableObject<std::shared_ptr<const Object>>, AwaitableScalar<type>>;
830822
};
831823

832824
// Convert a single value of the specified type to JSON.
@@ -849,7 +841,7 @@ struct ModifiedResult
849841
co_await params.launch;
850842

851843
auto awaitedResult = co_await ModifiedResult<Object>::convert(
852-
std::static_pointer_cast<Object>(co_await result),
844+
std::static_pointer_cast<const Object>(co_await result),
853845
std::move(params));
854846

855847
co_return std::move(awaitedResult);
@@ -1155,7 +1147,7 @@ GRAPHQLSERVICE_EXPORT AwaitableResolver ModifiedResult<response::Value>::convert
11551147
AwaitableScalar<response::Value> result, ResolverParams params);
11561148
template <>
11571149
GRAPHQLSERVICE_EXPORT AwaitableResolver ModifiedResult<Object>::convert(
1158-
AwaitableObject<std::shared_ptr<Object>> result, ResolverParams params);
1150+
AwaitableObject<std::shared_ptr<const Object>> result, ResolverParams params);
11591151

11601152
// Export all of the scalar value validation methods
11611153
template <>
@@ -1260,10 +1252,10 @@ struct RequestDeliverParams
12601252
await_async launch {};
12611253

12621254
// Optional override for the default Subscription operation object.
1263-
std::shared_ptr<Object> subscriptionObject {};
1255+
std::shared_ptr<const Object> subscriptionObject {};
12641256
};
12651257

1266-
using TypeMap = internal::string_view_map<std::shared_ptr<Object>>;
1258+
using TypeMap = internal::string_view_map<std::shared_ptr<const Object>>;
12671259

12681260
// State which is captured and kept alive until all pending futures have been resolved for an
12691261
// operation. Note: SelectionSet is the other parameter that gets passed to the top level Object,
@@ -1328,12 +1320,14 @@ class Request : public std::enable_shared_from_this<Request>
13281320
private:
13291321
SubscriptionKey addSubscription(RequestSubscribeParams&& params);
13301322
void removeSubscription(SubscriptionKey key);
1331-
std::vector<std::shared_ptr<SubscriptionData>> collectRegistrations(
1323+
std::vector<std::shared_ptr<const SubscriptionData>> collectRegistrations(
13321324
std::string_view field, RequestDeliverFilter&& filter) const noexcept;
13331325

13341326
const TypeMap _operations;
1335-
std::unique_ptr<ValidateExecutableVisitor> _validation;
1336-
internal::sorted_map<SubscriptionKey, std::shared_ptr<SubscriptionData>> _subscriptions;
1327+
mutable std::mutex _validationMutex {};
1328+
const std::unique_ptr<ValidateExecutableVisitor> _validation;
1329+
mutable std::mutex _subscriptionMutex {};
1330+
internal::sorted_map<SubscriptionKey, std::shared_ptr<const SubscriptionData>> _subscriptions;
13371331
internal::sorted_map<SubscriptionName, internal::sorted_set<SubscriptionKey>> _listeners;
13381332
SubscriptionKey _nextKey = 0;
13391333
};

include/graphqlservice/introspection/DirectiveObject.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace graphql::introspection::object {
1414

15-
class Directive
15+
class Directive final
1616
: public service::Object
1717
{
1818
private:
@@ -73,7 +73,7 @@ class Directive
7373
const std::shared_ptr<T> _pimpl;
7474
};
7575

76-
const std::unique_ptr<Concept> _pimpl;
76+
const std::unique_ptr<const Concept> _pimpl;
7777

7878
service::TypeNames getTypeNames() const noexcept;
7979
service::ResolverMap getResolvers() const noexcept;

include/graphqlservice/introspection/EnumValueObject.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace graphql::introspection::object {
1414

15-
class EnumValue
15+
class EnumValue final
1616
: public service::Object
1717
{
1818
private:
@@ -66,7 +66,7 @@ class EnumValue
6666
const std::shared_ptr<T> _pimpl;
6767
};
6868

69-
const std::unique_ptr<Concept> _pimpl;
69+
const std::unique_ptr<const Concept> _pimpl;
7070

7171
service::TypeNames getTypeNames() const noexcept;
7272
service::ResolverMap getResolvers() const noexcept;

include/graphqlservice/introspection/FieldObject.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace graphql::introspection::object {
1414

15-
class Field
15+
class Field final
1616
: public service::Object
1717
{
1818
private:
@@ -80,7 +80,7 @@ class Field
8080
const std::shared_ptr<T> _pimpl;
8181
};
8282

83-
const std::unique_ptr<Concept> _pimpl;
83+
const std::unique_ptr<const Concept> _pimpl;
8484

8585
service::TypeNames getTypeNames() const noexcept;
8686
service::ResolverMap getResolvers() const noexcept;

include/graphqlservice/introspection/InputValueObject.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace graphql::introspection::object {
1414

15-
class InputValue
15+
class InputValue final
1616
: public service::Object
1717
{
1818
private:
@@ -66,7 +66,7 @@ class InputValue
6666
const std::shared_ptr<T> _pimpl;
6767
};
6868

69-
const std::unique_ptr<Concept> _pimpl;
69+
const std::unique_ptr<const Concept> _pimpl;
7070

7171
service::TypeNames getTypeNames() const noexcept;
7272
service::ResolverMap getResolvers() const noexcept;

include/graphqlservice/introspection/SchemaObject.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace graphql::introspection::object {
1414

15-
class Schema
15+
class Schema final
1616
: public service::Object
1717
{
1818
private:
@@ -80,7 +80,7 @@ class Schema
8080
const std::shared_ptr<T> _pimpl;
8181
};
8282

83-
const std::unique_ptr<Concept> _pimpl;
83+
const std::unique_ptr<const Concept> _pimpl;
8484

8585
service::TypeNames getTypeNames() const noexcept;
8686
service::ResolverMap getResolvers() const noexcept;

include/graphqlservice/introspection/TypeObject.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace graphql::introspection::object {
1414

15-
class Type
15+
class Type final
1616
: public service::Object
1717
{
1818
private:
@@ -108,7 +108,7 @@ class Type
108108
const std::shared_ptr<T> _pimpl;
109109
};
110110

111-
const std::unique_ptr<Concept> _pimpl;
111+
const std::unique_ptr<const Concept> _pimpl;
112112

113113
service::TypeNames getTypeNames() const noexcept;
114114
service::ResolverMap getResolvers() const noexcept;

0 commit comments

Comments
 (0)