Skip to content

Commit 2afe417

Browse files
committed
Keep subscription data alive for callbacks
1 parent 6fbbb12 commit 2afe417

File tree

2 files changed

+70
-41
lines changed

2 files changed

+70
-41
lines changed

GraphQLService.cpp

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,13 @@ void Object::endSelectionSet(const std::shared_ptr<RequestState>&) const
899899
{
900900
}
901901

902+
OperationData::OperationData(std::shared_ptr<RequestState>&& state, response::Value&& variables, FragmentMap&& fragments)
903+
: state(std::move(state))
904+
, variables(std::move(variables))
905+
, fragments(std::move(fragments))
906+
{
907+
}
908+
902909
// FragmentDefinitionVisitor visits the AST and collects all of the fragment
903910
// definitions in the document.
904911
class FragmentDefinitionVisitor
@@ -929,22 +936,6 @@ void FragmentDefinitionVisitor::visit(const peg::ast_node& fragmentDefinition)
929936
_fragments.insert({ fragmentDefinition.children.front()->content(), Fragment(fragmentDefinition) });
930937
}
931938

932-
struct OperationParams : public std::enable_shared_from_this<OperationParams>
933-
{
934-
explicit OperationParams(std::shared_ptr<RequestState>&& state, response::Value&& variables, FragmentMap&& fragments);
935-
936-
std::shared_ptr<RequestState> state;
937-
response::Value variables;
938-
FragmentMap fragments;
939-
};
940-
941-
OperationParams::OperationParams(std::shared_ptr<RequestState>&& state, response::Value&& variables, FragmentMap&& fragments)
942-
: state(std::move(state))
943-
, variables(std::move(variables))
944-
, fragments(std::move(fragments))
945-
{
946-
}
947-
948939
// OperationDefinitionVisitor visits the AST and executes the one with the specified
949940
// operation name.
950941
class OperationDefinitionVisitor
@@ -957,14 +948,17 @@ class OperationDefinitionVisitor
957948
void visit(const peg::ast_node& operationDefinition);
958949

959950
private:
960-
std::shared_ptr<OperationParams> _params;
951+
std::shared_ptr<OperationData> _params;
961952
const TypeMap& _operations;
962953
const std::string& _operationName;
963954
std::future<response::Value> _result;
964955
};
965956

966957
OperationDefinitionVisitor::OperationDefinitionVisitor(std::shared_ptr<RequestState> state, const TypeMap& operations, const std::string& operationName, response::Value&& variables, FragmentMap&& fragments)
967-
: _params(std::make_shared<OperationParams>(std::move(state), std::move(variables), std::move(fragments)))
958+
: _params(std::make_shared<OperationData>(
959+
std::move(state),
960+
std::move(variables),
961+
std::move(fragments)))
968962
, _operations(operations)
969963
, _operationName(operationName)
970964
{
@@ -1151,6 +1145,18 @@ void OperationDefinitionVisitor::visit(const peg::ast_node& operationDefinition)
11511145
}
11521146
}
11531147

1148+
SubscriptionData::SubscriptionData(std::shared_ptr<OperationData>&& data, std::unordered_set<SubscriptionName>&& fieldNames,
1149+
std::unique_ptr<peg::ast<std::string>>&& query, std::string&& operationName, SubscriptionCallback&& callback,
1150+
const peg::ast_node& selection)
1151+
: data(std::move(data))
1152+
, fieldNames(std::move(fieldNames))
1153+
, query(std::move(query))
1154+
, operationName(std::move(operationName))
1155+
, callback(std::move(callback))
1156+
, selection(selection)
1157+
{
1158+
}
1159+
11541160
// SubscriptionDefinitionVisitor visits the AST collects the fields referenced in the subscription at the point
11551161
// where we create a subscription.
11561162
class SubscriptionDefinitionVisitor
@@ -1159,15 +1165,15 @@ class SubscriptionDefinitionVisitor
11591165
SubscriptionDefinitionVisitor(SubscriptionParams&& params, SubscriptionCallback&& callback, FragmentMap&& fragments);
11601166

11611167
const peg::ast_node& getRoot() const;
1162-
SubscriptionRegistration getRegistration();
1168+
std::shared_ptr<SubscriptionData> getRegistration();
11631169

11641170
void visit(const peg::ast_node& operationDefinition);
11651171

11661172
private:
11671173
SubscriptionParams _params;
11681174
SubscriptionCallback _callback;
11691175
FragmentMap _fragments;
1170-
std::unique_ptr<SubscriptionRegistration> _result;
1176+
std::shared_ptr<SubscriptionData> _result;
11711177
};
11721178

11731179
SubscriptionDefinitionVisitor::SubscriptionDefinitionVisitor(SubscriptionParams&& params, SubscriptionCallback&& callback, FragmentMap&& fragments)
@@ -1182,7 +1188,7 @@ const peg::ast_node& SubscriptionDefinitionVisitor::getRoot() const
11821188
return *_params.query->root;
11831189
}
11841190

1185-
SubscriptionRegistration SubscriptionDefinitionVisitor::getRegistration()
1191+
std::shared_ptr<SubscriptionData> SubscriptionDefinitionVisitor::getRegistration()
11861192
{
11871193
if (!_result)
11881194
{
@@ -1198,7 +1204,7 @@ SubscriptionRegistration SubscriptionDefinitionVisitor::getRegistration()
11981204
throw schema_exception({ error.str() });
11991205
}
12001206

1201-
auto result = std::move(*_result);
1207+
auto result = std::move(_result);
12021208

12031209
_result.reset();
12041210

@@ -1274,13 +1280,16 @@ void SubscriptionDefinitionVisitor::visit(const peg::ast_node& operationDefiniti
12741280
});
12751281
});
12761282

1277-
_result.reset(new SubscriptionRegistration {
1278-
std::move(_params),
1279-
std::move(_callback),
1280-
selection,
1283+
_result = std::make_shared<SubscriptionData>(
1284+
std::make_shared<OperationData>(
1285+
std::move(_params.state),
1286+
std::move(_params.variables),
1287+
std::move(_fragments)),
12811288
std::move(fieldNames),
1282-
std::move(_fragments)
1283-
});
1289+
std::move(_params.query),
1290+
std::move(_params.operationName),
1291+
std::move(_callback),
1292+
selection);
12841293
}
12851294

12861295
Request::Request(TypeMap&& operationTypes)
@@ -1332,7 +1341,7 @@ SubscriptionKey Request::subscribe(SubscriptionParams&& params, SubscriptionCall
13321341
auto registration = subscriptionVisitor.getRegistration();
13331342
auto key = _nextKey++;
13341343

1335-
for (const auto& name : registration.fieldNames)
1344+
for (const auto& name : registration->fieldNames)
13361345
{
13371346
_listeners[name].insert(key);
13381347
}
@@ -1351,7 +1360,7 @@ void Request::unsubscribe(SubscriptionKey key)
13511360
return;
13521361
}
13531362

1354-
for (const auto& name : itrSubscription->second.fieldNames)
1363+
for (const auto& name : itrSubscription->second->fieldNames)
13551364
{
13561365
auto itrListener = _listeners.find(name);
13571366

@@ -1390,20 +1399,20 @@ void Request::deliver(const SubscriptionName& name, const std::shared_ptr<Object
13901399
for (const auto& key : itrListeners->second)
13911400
{
13921401
auto itrSubscription = _subscriptions.find(key);
1393-
const auto& registration = itrSubscription->second;
1402+
auto registration = itrSubscription->second;
13941403
std::future<response::Value> result;
13951404

13961405
try
13971406
{
13981407
result = std::async(std::launch::deferred,
1399-
[](std::future<response::Value> data)
1408+
[registration](std::future<response::Value> data)
14001409
{
14011410
response::Value document(response::Type::Map);
14021411

14031412
document.emplace_back("data", data.get());
14041413

14051414
return document;
1406-
}, optionalOrDefaultSubscription->resolve(registration.params.state, registration.selection, registration.fragments, registration.params.variables));
1415+
}, optionalOrDefaultSubscription->resolve(registration->data->state, registration->selection, registration->data->fragments, registration->data->variables));
14071416
}
14081417
catch (const schema_exception& ex)
14091418
{
@@ -1416,7 +1425,7 @@ void Request::deliver(const SubscriptionName& name, const std::shared_ptr<Object
14161425
result = promise.get_future();
14171426
}
14181427

1419-
registration.callback(std::move(result));
1428+
registration->callback(std::move(result));
14201429
}
14211430
}
14221431

include/graphqlservice/GraphQLService.h

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -464,22 +464,42 @@ struct SubscriptionParams
464464
response::Value variables;
465465
};
466466

467-
// Subscription callbacks receive a reference to the SubscriptionParams and the response::Value
468-
// representing the result of evaluating the SelectionSet against the payload.
467+
// State which is captured and kept alive until all pending futures have been resolved for an operation.
468+
// Note: SelectionSet is the other parameter that gets passed to the top level Object, it's a borrowed
469+
// reference to an element in the AST. In the case of query and mutation operations, it's up to the caller
470+
// to guarantee the lifetime of the AST exceeds the futures we return. Subscription operations need to
471+
// hold onto the queries in SubscriptionData, so the lifetime is already tied to the registration and
472+
// any pending futures passed to callbacks.
473+
struct OperationData : std::enable_shared_from_this<OperationData>
474+
{
475+
explicit OperationData(std::shared_ptr<RequestState>&& state, response::Value&& variables, FragmentMap&& fragments);
476+
477+
std::shared_ptr<RequestState> state;
478+
response::Value variables;
479+
FragmentMap fragments;
480+
};
481+
482+
// Subscription callbacks receive the response::Value representing the result of evaluating the
483+
// SelectionSet against the payload.
469484
using SubscriptionCallback = std::function<void(std::future<response::Value>)>;
470485

471486
// Subscriptions are stored in maps using these keys.
472487
using SubscriptionKey = size_t;
473488
using SubscriptionName = std::string;
474489

475490
// Registration information for subscription, cached in the Request::subscribe call.
476-
struct SubscriptionRegistration
491+
struct SubscriptionData : std::enable_shared_from_this<SubscriptionData>
477492
{
478-
SubscriptionParams params;
493+
explicit SubscriptionData(std::shared_ptr<OperationData>&& data, std::unordered_set<SubscriptionName>&& fieldNames,
494+
std::unique_ptr<peg::ast<std::string>>&& query, std::string&& operationName, SubscriptionCallback&& callback,
495+
const peg::ast_node& selection);
496+
497+
std::shared_ptr<OperationData> data;
498+
std::unordered_set<SubscriptionName> fieldNames;
499+
std::unique_ptr<peg::ast<std::string>> query;
500+
std::string operationName;
479501
SubscriptionCallback callback;
480502
const peg::ast_node& selection;
481-
std::unordered_set<SubscriptionName> fieldNames;
482-
FragmentMap fragments;
483503
};
484504

485505
// Request scans the fragment definitions and finds the right operation definition to interpret
@@ -500,7 +520,7 @@ class Request : public std::enable_shared_from_this<Request>
500520

501521
private:
502522
TypeMap _operations;
503-
std::map<SubscriptionKey, SubscriptionRegistration> _subscriptions;
523+
std::map<SubscriptionKey, std::shared_ptr<SubscriptionData>> _subscriptions;
504524
std::unordered_map<SubscriptionName, std::set<SubscriptionKey>> _listeners;
505525
SubscriptionKey _nextKey = 0;
506526
};

0 commit comments

Comments
 (0)