Skip to content

Commit e1e6aef

Browse files
authored
Merge pull request #119 from wravery/notify-on-subscribe
Notify on subscribe
2 parents 7028ef9 + 0db2d3a commit e1e6aef

File tree

4 files changed

+245
-7
lines changed

4 files changed

+245
-7
lines changed

include/graphqlservice/GraphQLService.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,33 @@ constexpr std::string_view strSubscription { "subscription"sv };
113113

114114
}
115115

116+
// Resolvers may be called in multiple different Operation contexts.
117+
enum class ResolverContext
118+
{
119+
// Resolving a Query operation.
120+
Query,
121+
122+
// Resolving a Mutation operation.
123+
Mutation,
124+
125+
// Adding a Subscription. If you need to prepare to send events for this Subsciption
126+
// (e.g. registering an event sink of your own), this is a chance to do that.
127+
NotifySubscribe,
128+
129+
// Resolving a Subscription event.
130+
Subscription,
131+
132+
// Removing a Subscription. If there are no more Subscriptions registered this is an
133+
// opportunity to release resources which are no longer needed.
134+
NotifyUnsubscribe,
135+
};
136+
116137
// Pass a common bundle of parameters to all of the generated Object::getField accessors in a SelectionSet
117138
struct SelectionSetParams
118139
{
140+
// Context for this selection set.
141+
const ResolverContext resolverContext;
142+
119143
// The lifetime of each of these borrowed references is guaranteed until the future returned
120144
// by the accessor is resolved or destroyed. They are owned by the OperationData shared pointer.
121145
const std::shared_ptr<RequestState>& state;
@@ -855,7 +879,10 @@ class Request : public std::enable_shared_from_this<Request>
855879
GRAPHQLSERVICE_EXPORT std::future<response::Value> resolve(std::launch launch, const std::shared_ptr<RequestState>& state, peg::ast& query, const std::string& operationName, response::Value&& variables) const;
856880

857881
GRAPHQLSERVICE_EXPORT SubscriptionKey subscribe(SubscriptionParams&& params, SubscriptionCallback&& callback);
882+
GRAPHQLSERVICE_EXPORT std::future<SubscriptionKey> subscribe(std::launch launch, SubscriptionParams&& params, SubscriptionCallback&& callback);
883+
858884
GRAPHQLSERVICE_EXPORT void unsubscribe(SubscriptionKey key);
885+
GRAPHQLSERVICE_EXPORT std::future<void> unsubscribe(std::launch launch, SubscriptionKey key);
859886

860887
GRAPHQLSERVICE_EXPORT void deliver(const SubscriptionName& name, const std::shared_ptr<Object>& subscriptionObject) const;
861888
GRAPHQLSERVICE_EXPORT void deliver(const SubscriptionName& name, const SubscriptionArguments& arguments, const std::shared_ptr<Object>& subscriptionObject) const;

samples/today/TodayMock.h

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,8 +434,50 @@ class NextAppointmentChange : public object::Subscription
434434
{
435435
}
436436

437+
static size_t getCount(service::ResolverContext resolverContext)
438+
{
439+
switch (resolverContext)
440+
{
441+
case service::ResolverContext::NotifySubscribe:
442+
return _notifySubscribeCount;
443+
444+
case service::ResolverContext::Subscription:
445+
return _subscriptionCount;
446+
447+
case service::ResolverContext::NotifyUnsubscribe:
448+
return _notifyUnsubscribeCount;
449+
450+
default:
451+
throw std::runtime_error("Unexpected ResolverContext");
452+
}
453+
}
454+
437455
service::FieldResult<std::shared_ptr<object::Appointment>> getNextAppointmentChange(service::FieldParams&& params) const final
438456
{
457+
switch (params.resolverContext)
458+
{
459+
case service::ResolverContext::NotifySubscribe:
460+
{
461+
++_notifySubscribeCount;
462+
break;
463+
}
464+
465+
case service::ResolverContext::Subscription:
466+
{
467+
++_subscriptionCount;
468+
break;
469+
}
470+
471+
case service::ResolverContext::NotifyUnsubscribe:
472+
{
473+
++_notifyUnsubscribeCount;
474+
break;
475+
}
476+
477+
default:
478+
throw std::runtime_error("Unexpected ResolverContext");
479+
}
480+
439481
return std::static_pointer_cast<object::Appointment>(_changeNextAppointment(params.state));
440482
}
441483

@@ -446,6 +488,10 @@ class NextAppointmentChange : public object::Subscription
446488

447489
private:
448490
nextAppointmentChange _changeNextAppointment;
491+
492+
static size_t _notifySubscribeCount;
493+
static size_t _subscriptionCount;
494+
static size_t _notifyUnsubscribeCount;
449495
};
450496

451497
class NodeChange : public object::Subscription

src/GraphQLService.cpp

Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,7 @@ class SelectionVisitor
913913
void visitFragmentSpread(const peg::ast_node& fragmentSpread);
914914
void visitInlineFragment(const peg::ast_node& inlineFragment);
915915

916+
const ResolverContext _resolverContext;
916917
const std::shared_ptr<RequestState>& _state;
917918
const response::Value& _operationDirectives;
918919
const field_path _path;
@@ -930,7 +931,8 @@ class SelectionVisitor
930931
SelectionVisitor::SelectionVisitor(const SelectionSetParams& selectionSetParams,
931932
const FragmentMap& fragments, const response::Value& variables,
932933
const TypeNames& typeNames, const ResolverMap& resolvers)
933-
: _state(selectionSetParams.state)
934+
: _resolverContext(selectionSetParams.resolverContext)
935+
, _state(selectionSetParams.state)
934936
, _operationDirectives(selectionSetParams.operationDirectives)
935937
, _path(selectionSetParams.errorPath)
936938
, _launch(selectionSetParams.launch)
@@ -1065,6 +1067,7 @@ void SelectionVisitor::visitField(const peg::ast_node& field)
10651067
path.push({ alias });
10661068

10671069
SelectionSetParams selectionSetParams {
1070+
_resolverContext,
10681071
_state,
10691072
_operationDirectives,
10701073
_fragmentDirectives.top().fragmentDefinitionDirectives,
@@ -1445,21 +1448,25 @@ void FragmentDefinitionVisitor::visit(const peg::ast_node& fragmentDefinition)
14451448
class OperationDefinitionVisitor
14461449
{
14471450
public:
1448-
OperationDefinitionVisitor(std::launch launch, std::shared_ptr<RequestState> state, const TypeMap& operations, response::Value&& variables, FragmentMap&& fragments);
1451+
OperationDefinitionVisitor(ResolverContext resolverContext, std::launch launch, std::shared_ptr<RequestState> state,
1452+
const TypeMap& operations, response::Value&& variables, FragmentMap&& fragments);
14491453

14501454
std::future<response::Value> getValue();
14511455

14521456
void visit(const std::string& operationType, const peg::ast_node& operationDefinition);
14531457

14541458
private:
1459+
const ResolverContext _resolverContext;
14551460
const std::launch _launch;
14561461
std::shared_ptr<OperationData> _params;
14571462
const TypeMap& _operations;
14581463
std::future<response::Value> _result;
14591464
};
14601465

1461-
OperationDefinitionVisitor::OperationDefinitionVisitor(std::launch launch, std::shared_ptr<RequestState> state, const TypeMap& operations, response::Value&& variables, FragmentMap&& fragments)
1462-
: _launch(launch)
1466+
OperationDefinitionVisitor::OperationDefinitionVisitor(ResolverContext resolverContext, std::launch launch, std::shared_ptr<RequestState> state,
1467+
const TypeMap& operations, response::Value&& variables, FragmentMap&& fragments)
1468+
: _resolverContext(resolverContext)
1469+
, _launch(launch)
14631470
, _params(std::make_shared<OperationData>(
14641471
std::move(state),
14651472
std::move(variables),
@@ -1534,11 +1541,12 @@ void OperationDefinitionVisitor::visit(const std::string& operationType, const p
15341541

15351542
// Keep the params alive until the deferred lambda has executed
15361543
_result = std::async(_launch,
1537-
[selectionLaunch = _launch, params = std::move(_params), operation = itr->second](const peg::ast_node& selection)
1544+
[selectionContext = _resolverContext, selectionLaunch = _launch, params = std::move(_params), operation = itr->second](const peg::ast_node& selection)
15381545
{
15391546
// The top level object doesn't come from inside of a fragment, so all of the fragment directives are empty.
15401547
const response::Value emptyFragmentDirectives(response::Type::Map);
15411548
const SelectionSetParams selectionSetParams {
1549+
selectionContext,
15421550
params->state,
15431551
params->directives,
15441552
emptyFragmentDirectives,
@@ -1998,14 +2006,20 @@ std::future<response::Value> Request::resolveValidated(std::launch launch, const
19982006
throw schema_exception { { schema_error{ message.str(), { position.line, position.column } } } };
19992007
}
20002008

2009+
const bool isMutation = (operationDefinition.first == strMutation);
2010+
20012011
// http://spec.graphql.org/June2018/#sec-Normal-and-Serial-Execution
2002-
if (operationDefinition.first == strMutation)
2012+
if (isMutation)
20032013
{
20042014
// Force mutations to perform serial execution
20052015
launch = std::launch::deferred;
20062016
}
20072017

2008-
OperationDefinitionVisitor operationVisitor(launch, state, _operations, std::move(variables), std::move(fragments));
2018+
const auto resolverContext = isMutation
2019+
? ResolverContext::Mutation
2020+
: ResolverContext::Query;
2021+
2022+
OperationDefinitionVisitor operationVisitor(resolverContext, launch, state, _operations, std::move(variables), std::move(fragments));
20092023

20102024
operationVisitor.visit(operationDefinition.first, *operationDefinition.second);
20112025

@@ -2090,6 +2104,45 @@ SubscriptionKey Request::subscribe(SubscriptionParams&& params, SubscriptionCall
20902104
return key;
20912105
}
20922106

2107+
std::future<SubscriptionKey> Request::subscribe(std::launch launch, SubscriptionParams&& params, SubscriptionCallback&& callback)
2108+
{
2109+
return std::async(launch, [spThis = shared_from_this(), launch](SubscriptionParams&& paramsFuture, SubscriptionCallback&& callbackFuture)
2110+
{
2111+
const auto key = spThis->subscribe(std::move(paramsFuture), std::move(callbackFuture));
2112+
const auto itrOperation = spThis->_operations.find(std::string { strSubscription });
2113+
2114+
if (itrOperation != spThis->_operations.cend())
2115+
{
2116+
const auto& operation = itrOperation->second;
2117+
const auto& registration = spThis->_subscriptions.at(key);
2118+
response::Value emptyFragmentDirectives(response::Type::Map);
2119+
const SelectionSetParams selectionSetParams {
2120+
ResolverContext::NotifySubscribe,
2121+
registration->data->state,
2122+
registration->data->directives,
2123+
emptyFragmentDirectives,
2124+
emptyFragmentDirectives,
2125+
emptyFragmentDirectives,
2126+
{},
2127+
launch,
2128+
};
2129+
2130+
try
2131+
{
2132+
operation->resolve(selectionSetParams, registration->selection, registration->data->fragments, registration->data->variables).get();
2133+
}
2134+
catch (const std::exception& ex)
2135+
{
2136+
// Rethrow the exception, but don't leave it subscribed if the resolver failed.
2137+
spThis->unsubscribe(key);
2138+
throw ex;
2139+
}
2140+
}
2141+
2142+
return key;
2143+
}, std::move(params), std::move(callback));
2144+
}
2145+
20932146
void Request::unsubscribe(SubscriptionKey key)
20942147
{
20952148
auto itrSubscription = _subscriptions.find(key);
@@ -2119,6 +2172,35 @@ void Request::unsubscribe(SubscriptionKey key)
21192172
}
21202173
}
21212174

2175+
std::future<void> Request::unsubscribe(std::launch launch, SubscriptionKey key)
2176+
{
2177+
return std::async(launch, [spThis = shared_from_this(), launch, key]()
2178+
{
2179+
const auto itrOperation = spThis->_operations.find(std::string { strSubscription });
2180+
2181+
if (itrOperation != spThis->_operations.cend())
2182+
{
2183+
const auto& operation = itrOperation->second;
2184+
const auto& registration = spThis->_subscriptions.at(key);
2185+
response::Value emptyFragmentDirectives(response::Type::Map);
2186+
const SelectionSetParams selectionSetParams {
2187+
ResolverContext::NotifyUnsubscribe,
2188+
registration->data->state,
2189+
registration->data->directives,
2190+
emptyFragmentDirectives,
2191+
emptyFragmentDirectives,
2192+
emptyFragmentDirectives,
2193+
{},
2194+
launch,
2195+
};
2196+
2197+
operation->resolve(selectionSetParams, registration->selection, registration->data->fragments, registration->data->variables).get();
2198+
}
2199+
2200+
spThis->unsubscribe(key);
2201+
});
2202+
}
2203+
21222204
void Request::deliver(const SubscriptionName& name, const std::shared_ptr<Object>& subscriptionObject) const
21232205
{
21242206
deliver(std::launch::deferred, name, subscriptionObject);
@@ -2193,6 +2275,7 @@ void Request::deliver(std::launch launch, const SubscriptionName& name, const Su
21932275
std::future<response::Value> result;
21942276
response::Value emptyFragmentDirectives(response::Type::Map);
21952277
const SelectionSetParams selectionSetParams {
2278+
ResolverContext::Subscription,
21962279
registration->data->state,
21972280
registration->data->directives,
21982281
emptyFragmentDirectives,

test/TodayTests.cpp

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1413,3 +1413,85 @@ TEST_F(TodayServiceCase, QueryAppointmentsThroughUnionTypeFragment)
14131413
FAIL() << response::toJSON(ex.getErrors());
14141414
}
14151415
}
1416+
1417+
size_t today::NextAppointmentChange::_notifySubscribeCount = 0;
1418+
size_t today::NextAppointmentChange::_subscriptionCount = 0;
1419+
size_t today::NextAppointmentChange::_notifyUnsubscribeCount = 0;
1420+
1421+
TEST_F(TodayServiceCase, SubscribeUnsubscribeNotificationsAsync)
1422+
{
1423+
auto query = peg::parseString(R"(subscription TestSubscription {
1424+
nextAppointment: nextAppointmentChange {
1425+
nextAppointmentId: id
1426+
when
1427+
subject
1428+
isNow
1429+
}
1430+
})");
1431+
response::Value variables(response::Type::Map);
1432+
auto state = std::make_shared<today::RequestState>(21);
1433+
bool calledCallback = false;
1434+
const auto notifySubscribeBegin = today::NextAppointmentChange::getCount(service::ResolverContext::NotifySubscribe);
1435+
const auto subscriptionBegin = today::NextAppointmentChange::getCount(service::ResolverContext::Subscription);
1436+
const auto notifyUnsubscribeBegin = today::NextAppointmentChange::getCount(service::ResolverContext::NotifyUnsubscribe);
1437+
auto key = _service->subscribe(std::launch::async, service::SubscriptionParams { state, std::move(query), "TestSubscription", std::move(std::move(variables)) },
1438+
[&calledCallback](std::future<response::Value> response)
1439+
{
1440+
calledCallback = true;
1441+
});
1442+
_service->unsubscribe(std::launch::async, key.get()).get();
1443+
const auto notifySubscribeEnd = today::NextAppointmentChange::getCount(service::ResolverContext::NotifySubscribe);
1444+
const auto subscriptionEnd = today::NextAppointmentChange::getCount(service::ResolverContext::Subscription);
1445+
const auto notifyUnsubscribeEnd = today::NextAppointmentChange::getCount(service::ResolverContext::NotifyUnsubscribe);
1446+
1447+
try
1448+
{
1449+
EXPECT_FALSE(calledCallback);
1450+
EXPECT_EQ(notifySubscribeBegin + 1, notifySubscribeEnd) << "should pass NotifySubscribe once";
1451+
EXPECT_EQ(subscriptionBegin, subscriptionEnd) << "should not pass Subscription";
1452+
EXPECT_EQ(notifyUnsubscribeBegin + 1, notifyUnsubscribeEnd) << "should pass NotifyUnsubscribe once";
1453+
}
1454+
catch (service::schema_exception & ex)
1455+
{
1456+
FAIL() << response::toJSON(ex.getErrors());
1457+
}
1458+
}
1459+
1460+
TEST_F(TodayServiceCase, SubscribeUnsubscribeNotificationsDeferred)
1461+
{
1462+
auto query = peg::parseString(R"(subscription TestSubscription {
1463+
nextAppointment: nextAppointmentChange {
1464+
nextAppointmentId: id
1465+
when
1466+
subject
1467+
isNow
1468+
}
1469+
})");
1470+
response::Value variables(response::Type::Map);
1471+
auto state = std::make_shared<today::RequestState>(21);
1472+
bool calledCallback = false;
1473+
const auto notifySubscribeBegin = today::NextAppointmentChange::getCount(service::ResolverContext::NotifySubscribe);
1474+
const auto subscriptionBegin = today::NextAppointmentChange::getCount(service::ResolverContext::Subscription);
1475+
const auto notifyUnsubscribeBegin = today::NextAppointmentChange::getCount(service::ResolverContext::NotifyUnsubscribe);
1476+
auto key = _service->subscribe(std::launch::deferred, service::SubscriptionParams { state, std::move(query), "TestSubscription", std::move(std::move(variables)) },
1477+
[&calledCallback](std::future<response::Value> response)
1478+
{
1479+
calledCallback = true;
1480+
});
1481+
_service->unsubscribe(std::launch::deferred, key.get()).get();
1482+
const auto notifySubscribeEnd = today::NextAppointmentChange::getCount(service::ResolverContext::NotifySubscribe);
1483+
const auto subscriptionEnd = today::NextAppointmentChange::getCount(service::ResolverContext::Subscription);
1484+
const auto notifyUnsubscribeEnd = today::NextAppointmentChange::getCount(service::ResolverContext::NotifyUnsubscribe);
1485+
1486+
try
1487+
{
1488+
EXPECT_FALSE(calledCallback);
1489+
EXPECT_EQ(notifySubscribeBegin + 1, notifySubscribeEnd) << "should pass NotifySubscribe once";
1490+
EXPECT_EQ(subscriptionBegin, subscriptionEnd) << "should not pass Subscription";
1491+
EXPECT_EQ(notifyUnsubscribeBegin + 1, notifyUnsubscribeEnd) << "should pass NotifyUnsubscribe once";
1492+
}
1493+
catch (service::schema_exception & ex)
1494+
{
1495+
FAIL() << response::toJSON(ex.getErrors());
1496+
}
1497+
}

0 commit comments

Comments
 (0)