Skip to content

Commit fd09323

Browse files
authored
Merge pull request #180 from wravery/next
Implement delivery by SubscriptionKey and a type-erased JSON writer interface
2 parents 80dba3a + 5291b0f commit fd09323

File tree

13 files changed

+825
-612
lines changed

13 files changed

+825
-612
lines changed

doc/subscriptions.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,20 @@ Subscriptions are created or removed by calling the `Request::subscribe`
1414
and `Request::unsubscribe` methods in [GraphQLService.h](../include/graphqlservice/GraphQLService.h):
1515
```cpp
1616
GRAPHQLSERVICE_EXPORT SubscriptionKey subscribe(
17-
SubscriptionParams&& params, SubscriptionCallback&& callback);
17+
RequestSubscribeParams&& params, SubscriptionCallback&& callback);
1818
GRAPHQLSERVICE_EXPORT AwaitableSubscribe subscribe(
19-
std::launch launch, SubscriptionParams&& params, SubscriptionCallback&& callback);
19+
std::launch launch, RequestSubscribeParams&& params, SubscriptionCallback&& callback);
2020

2121
GRAPHQLSERVICE_EXPORT void unsubscribe(SubscriptionKey key);
2222
GRAPHQLSERVICE_EXPORT AwaitableUnsubscribe unsubscribe(std::launch launch, SubscriptionKey key);
2323
```
24-
You need to fill in a `SubscriptionParams` struct with the [parsed](./parsing.md)
24+
You need to fill in a `RequestSubscribeParams` struct with the [parsed](./parsing.md)
2525
query and any other relevant operation parameters:
2626
```cpp
2727
// You can still sub-class RequestState and use that in the state parameter to Request::subscribe
2828
// to add your own state to the service callbacks that you receive while executing the subscription
2929
// query.
30-
struct SubscriptionParams
30+
struct RequestSubscribeParams
3131
{
3232
std::shared_ptr<RequestState> state;
3333
peg::ast query;

include/graphqlservice/GraphQLResponse.h

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,102 @@ GRAPHQLRESPONSE_EXPORT IdType Value::release<IdType>();
263263

264264
using AwaitableValue = internal::Awaitable<Value>;
265265

266+
class Writer
267+
{
268+
private:
269+
struct Concept
270+
{
271+
virtual ~Concept() = default;
272+
273+
virtual void start_object() const = 0;
274+
virtual void add_member(const std::string& key) const = 0;
275+
virtual void end_object() const = 0;
276+
277+
virtual void start_array() const = 0;
278+
virtual void end_arrary() const = 0;
279+
280+
virtual void write_null() const = 0;
281+
virtual void write_string(const std::string& value) const = 0;
282+
virtual void write_bool(bool value) const = 0;
283+
virtual void write_int(int value) const = 0;
284+
virtual void write_float(double value) const = 0;
285+
};
286+
287+
template <class T>
288+
struct Model : Concept
289+
{
290+
Model(std::unique_ptr<T>&& pimpl)
291+
: _pimpl { std::move(pimpl) }
292+
{
293+
}
294+
295+
void start_object() const final
296+
{
297+
_pimpl->start_object();
298+
}
299+
300+
void add_member(const std::string& key) const final
301+
{
302+
_pimpl->add_member(key);
303+
}
304+
305+
void end_object() const final
306+
{
307+
_pimpl->end_object();
308+
}
309+
310+
void start_array() const final
311+
{
312+
_pimpl->start_array();
313+
}
314+
315+
void end_arrary() const final
316+
{
317+
_pimpl->end_arrary();
318+
}
319+
320+
void write_null() const final
321+
{
322+
_pimpl->write_null();
323+
}
324+
325+
void write_string(const std::string& value) const final
326+
{
327+
_pimpl->write_string(value);
328+
}
329+
330+
void write_bool(bool value) const final
331+
{
332+
_pimpl->write_bool(value);
333+
}
334+
335+
void write_int(int value) const final
336+
{
337+
_pimpl->write_int(value);
338+
}
339+
340+
void write_float(double value) const final
341+
{
342+
_pimpl->write_float(value);
343+
}
344+
345+
private:
346+
std::unique_ptr<T> _pimpl;
347+
};
348+
349+
const std::shared_ptr<const Concept> _concept;
350+
351+
public:
352+
template <class T>
353+
Writer(std::unique_ptr<T> writer)
354+
: _concept { std::static_pointer_cast<Concept>(
355+
std::make_shared<Model<T>>(std::move(writer))) }
356+
{
357+
}
358+
359+
GRAPHQLRESPONSE_EXPORT void write(Value value) const;
360+
};
361+
266362
} // namespace graphql::response
267363

268364
#endif // GRAPHQLRESPONSE_H

include/graphqlservice/GraphQLService.h

Lines changed: 113 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ struct await_worker_thread : coro::suspend_always
162162
}
163163
};
164164

165-
// Type-erased awaitable, if you want finer grain control.
165+
// Type-erased awaitable.
166166
class await_async : public coro::suspend_always
167167
{
168168
private:
@@ -172,6 +172,7 @@ class await_async : public coro::suspend_always
172172

173173
virtual bool await_ready() const = 0;
174174
virtual void await_suspend(coro::coroutine_handle<> h) const = 0;
175+
virtual void await_resume() const = 0;
175176
};
176177

177178
template <class T>
@@ -192,19 +193,33 @@ class await_async : public coro::suspend_always
192193
_pimpl->await_suspend(std::move(h));
193194
}
194195

196+
void await_resume() const final
197+
{
198+
_pimpl->await_resume();
199+
}
200+
195201
private:
196202
std::shared_ptr<T> _pimpl;
197203
};
198204

199205
const std::shared_ptr<Concept> _pimpl;
200206

201207
public:
208+
// Type-erased explicit constructor for a custom awaitable.
202209
template <class T>
203-
await_async(std::shared_ptr<T> pimpl)
210+
explicit await_async(std::shared_ptr<T> pimpl)
204211
: _pimpl { std::make_shared<Model<T>>(std::move(pimpl)) }
205212
{
206213
}
207214

215+
// Default to immediate synchronous execution.
216+
await_async()
217+
: _pimpl { std::static_pointer_cast<Concept>(
218+
std::make_shared<Model<coro::suspend_never>>(std::make_shared<coro::suspend_never>())) }
219+
{
220+
}
221+
222+
// Implicitly convert a std::launch parameter used with std::async to an awaitable.
208223
await_async(std::launch launch)
209224
: _pimpl { ((launch & std::launch::async) == std::launch::async)
210225
? std::static_pointer_cast<Concept>(std::make_shared<Model<await_worker_thread>>(
@@ -224,8 +239,9 @@ class await_async : public coro::suspend_always
224239
_pimpl->await_suspend(std::move(h));
225240
}
226241

227-
constexpr void await_resume() const noexcept
242+
void await_resume() const
228243
{
244+
_pimpl->await_resume();
229245
}
230246
};
231247

@@ -261,7 +277,7 @@ struct SelectionSetParams
261277
std::optional<field_path> errorPath;
262278

263279
// Async launch policy for sub-field resolvers.
264-
const await_async launch { std::launch::deferred };
280+
const await_async launch {};
265281
};
266282

267283
// Pass a common bundle of parameters to all of the generated Object::getField accessors.
@@ -931,19 +947,96 @@ GRAPHQLSERVICE_EXPORT AwaitableResolver ModifiedResult<Object>::convert(
931947
FieldResult<std::shared_ptr<Object>> result, ResolverParams params);
932948
#endif // GRAPHQL_DLLEXPORTS
933949

934-
using TypeMap = internal::string_view_map<std::shared_ptr<Object>>;
950+
// Subscription callbacks receive the response::Value representing the result of evaluating the
951+
// SelectionSet against the payload.
952+
using SubscriptionCallback = std::function<void(response::Value)>;
953+
954+
// Subscriptions are stored in maps using these keys.
955+
using SubscriptionKey = size_t;
956+
using SubscriptionName = std::string;
957+
958+
using AwaitableSubscribe = internal::Awaitable<SubscriptionKey>;
959+
using AwaitableUnsubscribe = internal::Awaitable<void>;
960+
using AwaitableDeliver = internal::Awaitable<void>;
935961

936-
// You can still sub-class RequestState and use that in the state parameter to Request::subscribe
937-
// to add your own state to the service callbacks that you receive while executing the subscription
938-
// query.
939-
struct SubscriptionParams
962+
struct RequestResolveParams
940963
{
964+
// Required query information.
965+
peg::ast& query;
966+
std::string_view operationName {};
967+
response::Value variables { response::Type::Map };
968+
969+
// Optional async execution awaitable.
970+
await_async launch;
971+
972+
// Optional sub-class of RequestState which will be passed to each resolver and field accessor.
941973
std::shared_ptr<RequestState> state;
974+
};
975+
976+
struct RequestSubscribeParams
977+
{
978+
// Callback which receives the event data.
979+
SubscriptionCallback callback;
980+
981+
// Required query information.
942982
peg::ast query;
943-
std::string operationName;
944-
response::Value variables;
983+
std::string operationName {};
984+
response::Value variables { response::Type::Map };
985+
986+
// Optional async execution awaitable.
987+
await_async launch;
988+
989+
// Optional sub-class of RequestState which will be passed to each resolver and field accessor.
990+
std::shared_ptr<RequestState> state;
991+
};
992+
993+
struct RequestUnsubscribeParams
994+
{
995+
// Key returned by a previous call to subscribe.
996+
SubscriptionKey key;
997+
998+
// Optional async execution awaitable.
999+
await_async launch;
9451000
};
9461001

1002+
using SubscriptionArguments = std::map<std::string_view, response::Value>;
1003+
using SubscriptionArgumentFilterCallback = std::function<bool(response::MapType::const_reference)>;
1004+
using SubscriptionDirectiveFilterCallback = std::function<bool(Directives::const_reference)>;
1005+
1006+
struct SubscriptionFilter
1007+
{
1008+
// Deliver to subscriptions on this field.
1009+
std::string_view field;
1010+
1011+
// Optional field argument filter, which can either be a set of required arguments, or a
1012+
// callback which returns true if the arguments match custom criteria.
1013+
std::optional<std::variant<SubscriptionArguments, SubscriptionArgumentFilterCallback>>
1014+
arguments;
1015+
1016+
// Optional field directives filter, which can either be a set of required directives and
1017+
// arguments, or a callback which returns true if the directives match custom criteria.
1018+
std::optional<std::variant<Directives, SubscriptionDirectiveFilterCallback>> directives;
1019+
};
1020+
1021+
// Deliver to a specific subscription key, or apply custom criteria for the field name, arguments,
1022+
// and directives in the Subscription query.
1023+
using RequestDeliverFilter = std::optional<std::variant<SubscriptionKey, SubscriptionFilter>>;
1024+
1025+
struct RequestDeliverParams
1026+
{
1027+
// Optional filter to control which subscriptions will receive the event. If not specified,
1028+
// every subscription will receive the event and evaluate their queries against it.
1029+
RequestDeliverFilter filter;
1030+
1031+
// Optional async execution awaitable.
1032+
await_async launch;
1033+
1034+
// Optional override for the default Subscription operation object.
1035+
std::shared_ptr<Object> subscriptionObject;
1036+
};
1037+
1038+
using TypeMap = internal::string_view_map<std::shared_ptr<Object>>;
1039+
9471040
// State which is captured and kept alive until all pending futures have been resolved for an
9481041
// operation. Note: SelectionSet is the other parameter that gets passed to the top level Object,
9491042
// it's a borrowed reference to an element in the AST. In the case of query and mutation operations,
@@ -961,21 +1054,6 @@ struct OperationData : std::enable_shared_from_this<OperationData>
9611054
FragmentMap fragments;
9621055
};
9631056

964-
// Subscription callbacks receive the response::Value representing the result of evaluating the
965-
// SelectionSet against the payload.
966-
using SubscriptionCallback = std::function<void(response::Value)>;
967-
using SubscriptionArguments = std::map<std::string_view, response::Value>;
968-
using SubscriptionArgumentFilterCallback = std::function<bool(response::MapType::const_reference)>;
969-
using SubscriptionDirectiveFilterCallback = std::function<bool(Directives::const_reference)>;
970-
971-
// Subscriptions are stored in maps using these keys.
972-
using SubscriptionKey = size_t;
973-
using SubscriptionName = std::string;
974-
975-
using AwaitableSubscribe = internal::Awaitable<SubscriptionKey>;
976-
using AwaitableUnsubscribe = internal::Awaitable<void>;
977-
using AwaitableDeliver = internal::Awaitable<void>;
978-
9791057
// Registration information for subscription, cached in the Request::subscribe call.
9801058
struct SubscriptionData : std::enable_shared_from_this<SubscriptionData>
9811059
{
@@ -1014,51 +1092,17 @@ class Request : public std::enable_shared_from_this<Request>
10141092
GRAPHQLSERVICE_EXPORT std::pair<std::string_view, const peg::ast_node*> findOperationDefinition(
10151093
peg::ast& query, std::string_view operationName) const;
10161094

1017-
GRAPHQLSERVICE_EXPORT response::AwaitableValue resolve(std::shared_ptr<RequestState> state,
1018-
peg::ast& query, std::string_view operationName, response::Value variables) const;
1019-
GRAPHQLSERVICE_EXPORT response::AwaitableValue resolve(await_async launch,
1020-
std::shared_ptr<RequestState> state, peg::ast& query, std::string_view operationName,
1021-
response::Value variables) const;
1022-
1023-
GRAPHQLSERVICE_EXPORT SubscriptionKey subscribe(
1024-
SubscriptionParams&& params, SubscriptionCallback&& callback);
1025-
GRAPHQLSERVICE_EXPORT AwaitableSubscribe subscribe(
1026-
await_async launch, SubscriptionParams&& params, SubscriptionCallback&& callback);
1027-
1028-
GRAPHQLSERVICE_EXPORT void unsubscribe(SubscriptionKey key);
1029-
GRAPHQLSERVICE_EXPORT AwaitableUnsubscribe unsubscribe(await_async launch, SubscriptionKey key);
1030-
1031-
GRAPHQLSERVICE_EXPORT void deliver(
1032-
const SubscriptionName& name, const std::shared_ptr<Object>& subscriptionObject) const;
1033-
GRAPHQLSERVICE_EXPORT void deliver(const SubscriptionName& name,
1034-
const SubscriptionArguments& arguments, std::shared_ptr<Object> subscriptionObject) const;
1035-
GRAPHQLSERVICE_EXPORT void deliver(const SubscriptionName& name,
1036-
const SubscriptionArguments& arguments, const Directives& directives,
1037-
std::shared_ptr<Object> subscriptionObject) const;
1038-
GRAPHQLSERVICE_EXPORT void deliver(const SubscriptionName& name,
1039-
const SubscriptionArgumentFilterCallback& applyArguments,
1040-
std::shared_ptr<Object> subscriptionObject) const;
1041-
GRAPHQLSERVICE_EXPORT void deliver(const SubscriptionName& name,
1042-
const SubscriptionArgumentFilterCallback& applyArguments,
1043-
const SubscriptionDirectiveFilterCallback& applyDirectives,
1044-
std::shared_ptr<Object> subscriptionObject) const;
1045-
1046-
GRAPHQLSERVICE_EXPORT AwaitableDeliver deliver(await_async launch, const SubscriptionName& name,
1047-
std::shared_ptr<Object> subscriptionObject) const;
1048-
GRAPHQLSERVICE_EXPORT AwaitableDeliver deliver(await_async launch, const SubscriptionName& name,
1049-
const SubscriptionArguments& arguments, std::shared_ptr<Object> subscriptionObject) const;
1050-
GRAPHQLSERVICE_EXPORT AwaitableDeliver deliver(await_async launch, const SubscriptionName& name,
1051-
const SubscriptionArguments& arguments, const Directives& directives,
1052-
std::shared_ptr<Object> subscriptionObject) const;
1053-
GRAPHQLSERVICE_EXPORT AwaitableDeliver deliver(await_async launch, const SubscriptionName& name,
1054-
const SubscriptionArgumentFilterCallback& applyArguments,
1055-
std::shared_ptr<Object> subscriptionObject) const;
1056-
GRAPHQLSERVICE_EXPORT AwaitableDeliver deliver(await_async launch, const SubscriptionName& name,
1057-
const SubscriptionArgumentFilterCallback& applyArguments,
1058-
const SubscriptionDirectiveFilterCallback& applyDirectives,
1059-
std::shared_ptr<Object> subscriptionObject) const;
1095+
GRAPHQLSERVICE_EXPORT response::AwaitableValue resolve(RequestResolveParams params) const;
1096+
GRAPHQLSERVICE_EXPORT AwaitableSubscribe subscribe(RequestSubscribeParams params);
1097+
GRAPHQLSERVICE_EXPORT AwaitableUnsubscribe unsubscribe(RequestUnsubscribeParams params);
1098+
GRAPHQLSERVICE_EXPORT AwaitableDeliver deliver(RequestDeliverParams params) const;
10601099

10611100
private:
1101+
SubscriptionKey addSubscription(RequestSubscribeParams&& params);
1102+
void removeSubscription(SubscriptionKey key);
1103+
std::vector<std::shared_ptr<SubscriptionData>> collectRegistrations(
1104+
RequestDeliverFilter&& filter) const noexcept;
1105+
10621106
const TypeMap _operations;
10631107
std::unique_ptr<ValidateExecutableVisitor> _validation;
10641108
internal::sorted_map<SubscriptionKey, std::shared_ptr<SubscriptionData>> _subscriptions;

samples/client/benchmark.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,7 @@ int main(int argc, char** argv)
141141
auto query = GetRequestObject();
142142

143143
const auto startResolve = std::chrono::steady_clock::now();
144-
auto response =
145-
service->resolve(nullptr, query, "", response::Value(response::Type::Map)).get();
144+
auto response = service->resolve({ query }).get();
146145
const auto startParseServiceResponse = std::chrono::steady_clock::now();
147146
auto serviceResponse = client::parseServiceResponse(std::move(response));
148147
const auto startParseResponse = std::chrono::steady_clock::now();

0 commit comments

Comments
 (0)