Skip to content

Commit 479a43e

Browse files
committed
Implement GraphQL subscriptions
1 parent f7a2f17 commit 479a43e

File tree

4 files changed

+391
-42
lines changed

4 files changed

+391
-42
lines changed

GraphQLService.cpp

Lines changed: 278 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ void SelectionVisitor::visitField(const peg::ast_node& field)
644644

645645
_values.push({
646646
std::move(alias),
647-
itr->second({ _state, arguments, selection, _fragments, _variables })
647+
itr->second({ _state, std::move(arguments), selection, _fragments, _variables })
648648
});
649649
}
650650

@@ -998,6 +998,11 @@ void OperationDefinitionVisitor::visit(const peg::ast_node& operationDefinition)
998998
{
999999
operation = "query";
10001000
}
1001+
else if (operation == "subscription")
1002+
{
1003+
// Skip subscription operations, they should use subscribe instead of resolve.
1004+
return;
1005+
}
10011006

10021007
auto position = operationDefinition.begin();
10031008
std::string name;
@@ -1060,56 +1065,55 @@ void OperationDefinitionVisitor::visit(const peg::ast_node& operationDefinition)
10601065
throw schema_exception({ error.str() });
10611066
}
10621067

1063-
_result = std::async(std::launch::deferred,
1064-
[this, &operationDefinition](std::shared_ptr<RequestState> state, std::shared_ptr<Object> operationObject)
1065-
{
1066-
response::Value operationVariables(response::Type::Map);
1068+
response::Value operationVariables(response::Type::Map);
10671069

1068-
peg::on_first_child<peg::variable_definitions>(operationDefinition,
1069-
[this, &operationVariables](const peg::ast_node& child)
1070+
peg::on_first_child<peg::variable_definitions>(operationDefinition,
1071+
[this, &operationVariables](const peg::ast_node& child)
1072+
{
1073+
peg::for_each_child<peg::variable>(child,
1074+
[this, &operationVariables](const peg::ast_node& variable)
10701075
{
1071-
peg::for_each_child<peg::variable>(child,
1072-
[this, &operationVariables](const peg::ast_node& variable)
1073-
{
1074-
std::string variableName;
1076+
std::string variableName;
10751077

1076-
peg::on_first_child<peg::variable_name>(variable,
1077-
[&variableName](const peg::ast_node& name)
1078-
{
1079-
// Skip the $ prefix
1080-
variableName = name.content().c_str() + 1;
1081-
});
1078+
peg::on_first_child<peg::variable_name>(variable,
1079+
[&variableName](const peg::ast_node& name)
1080+
{
1081+
// Skip the $ prefix
1082+
variableName = name.content().c_str() + 1;
1083+
});
10821084

1083-
auto itrVar = _variables.find(variableName);
1084-
response::Value valueVar;
1085+
auto itrVar = _variables.find(variableName);
1086+
response::Value valueVar;
10851087

1086-
if (itrVar != _variables.get<const response::MapType&>().cend())
1087-
{
1088-
valueVar = response::Value(itrVar->second);
1089-
}
1090-
else
1088+
if (itrVar != _variables.get<const response::MapType&>().cend())
1089+
{
1090+
valueVar = response::Value(itrVar->second);
1091+
}
1092+
else
1093+
{
1094+
peg::on_first_child<peg::default_value>(variable,
1095+
[this, &valueVar](const peg::ast_node& defaultValue)
10911096
{
1092-
peg::on_first_child<peg::default_value>(variable,
1093-
[this, &valueVar](const peg::ast_node& defaultValue)
1094-
{
1095-
ValueVisitor visitor(_variables);
1097+
ValueVisitor visitor(_variables);
10961098

1097-
visitor.visit(*defaultValue.children.front());
1098-
valueVar = visitor.getValue();
1099-
});
1100-
}
1099+
visitor.visit(*defaultValue.children.front());
1100+
valueVar = visitor.getValue();
1101+
});
1102+
}
11011103

1102-
operationVariables.emplace_back(std::move(variableName), std::move(valueVar));
1103-
});
1104+
operationVariables.emplace_back(std::move(variableName), std::move(valueVar));
11041105
});
1106+
});
11051107

1108+
_result = std::async(std::launch::deferred,
1109+
[](std::future<response::Value> data)
1110+
{
11061111
response::Value document(response::Type::Map);
1107-
auto data = operationObject->resolve(state, *operationDefinition.children.back(), _fragments, operationVariables);
11081112

11091113
document.emplace_back("data", data.get());
11101114

11111115
return document;
1112-
}, _state, itr->second);
1116+
}, itr->second->resolve(_state, *operationDefinition.children.back(), _fragments, operationVariables));
11131117
}
11141118
catch (const schema_exception& ex)
11151119
{
@@ -1124,6 +1128,138 @@ void OperationDefinitionVisitor::visit(const peg::ast_node& operationDefinition)
11241128
}
11251129
}
11261130

1131+
// SubscriptionDefinitionVisitor visits the AST collects the fields referenced in the subscription at the point
1132+
// where we create a subscription.
1133+
class SubscriptionDefinitionVisitor
1134+
{
1135+
public:
1136+
SubscriptionDefinitionVisitor(SubscriptionParams&& params, SubscriptionCallback&& callback, FragmentMap&& fragments);
1137+
1138+
const peg::ast_node& getRoot() const;
1139+
SubscriptionRegistration getRegistration();
1140+
1141+
void visit(const peg::ast_node& operationDefinition);
1142+
1143+
private:
1144+
SubscriptionParams _params;
1145+
SubscriptionCallback _callback;
1146+
FragmentMap _fragments;
1147+
std::unique_ptr<SubscriptionRegistration> _result;
1148+
};
1149+
1150+
SubscriptionDefinitionVisitor::SubscriptionDefinitionVisitor(SubscriptionParams&& params, SubscriptionCallback&& callback, FragmentMap&& fragments)
1151+
: _params(std::move(params))
1152+
, _callback(std::move(callback))
1153+
, _fragments(std::move(fragments))
1154+
{
1155+
}
1156+
1157+
const peg::ast_node& SubscriptionDefinitionVisitor::getRoot() const
1158+
{
1159+
return *_params.query->root;
1160+
}
1161+
1162+
SubscriptionRegistration SubscriptionDefinitionVisitor::getRegistration()
1163+
{
1164+
if (!_result)
1165+
{
1166+
std::ostringstream error;
1167+
1168+
error << "Missing operation";
1169+
1170+
if (!_params.operationName.empty())
1171+
{
1172+
error << " name: " << _params.operationName;
1173+
}
1174+
1175+
throw schema_exception({ error.str() });
1176+
}
1177+
1178+
auto result = std::move(*_result);
1179+
1180+
_result.reset();
1181+
1182+
return result;
1183+
}
1184+
1185+
void SubscriptionDefinitionVisitor::visit(const peg::ast_node& operationDefinition)
1186+
{
1187+
std::string operation;
1188+
1189+
peg::on_first_child<peg::operation_type>(operationDefinition,
1190+
[&operation](const peg::ast_node& child)
1191+
{
1192+
operation = child.content();
1193+
});
1194+
1195+
if (operation != "subscription")
1196+
{
1197+
// Skip operations other than subscription.
1198+
return;
1199+
}
1200+
1201+
auto position = operationDefinition.begin();
1202+
std::string name;
1203+
1204+
peg::on_first_child<peg::operation_name>(operationDefinition,
1205+
[&name](const peg::ast_node& child)
1206+
{
1207+
name = child.content();
1208+
});
1209+
1210+
if (!_params.operationName.empty()
1211+
&& name != _params.operationName)
1212+
{
1213+
// Skip the subscriptions that don't match the name
1214+
return;
1215+
}
1216+
1217+
if (_result)
1218+
{
1219+
std::ostringstream error;
1220+
1221+
if (_params.operationName.empty())
1222+
{
1223+
error << "No operationName specified with extra subscription";
1224+
}
1225+
else
1226+
{
1227+
error << "Duplicate subscription";
1228+
}
1229+
1230+
if (!name.empty())
1231+
{
1232+
error << " name: " << name;
1233+
}
1234+
1235+
error << " line: " << position.line
1236+
<< " column: " << position.byte_in_line;
1237+
1238+
throw schema_exception({ error.str() });
1239+
}
1240+
1241+
const auto& selection = *operationDefinition.children.back();
1242+
std::unordered_set<SubscriptionName> fieldNames;
1243+
1244+
peg::for_each_child<peg::field>(selection,
1245+
[this, &fieldNames](const peg::ast_node& field)
1246+
{
1247+
peg::on_first_child<peg::field_name>(field,
1248+
[&fieldNames](const peg::ast_node& child)
1249+
{
1250+
fieldNames.insert(child.content());
1251+
});
1252+
});
1253+
1254+
_result.reset(new SubscriptionRegistration {
1255+
std::move(_params),
1256+
std::move(_callback),
1257+
selection,
1258+
std::move(fieldNames),
1259+
std::move(_fragments)
1260+
});
1261+
}
1262+
11271263
Request::Request(TypeMap&& operationTypes)
11281264
: _operations(std::move(operationTypes))
11291265
{
@@ -1151,6 +1287,112 @@ std::future<response::Value> Request::resolve(const std::shared_ptr<RequestState
11511287
return operationVisitor.getValue();
11521288
}
11531289

1290+
SubscriptionKey Request::subscribe(SubscriptionParams&& params, SubscriptionCallback&& callback)
1291+
{
1292+
FragmentDefinitionVisitor fragmentVisitor;
1293+
1294+
peg::for_each_child<peg::fragment_definition>(*params.query->root,
1295+
[&fragmentVisitor](const peg::ast_node& child)
1296+
{
1297+
fragmentVisitor.visit(child);
1298+
});
1299+
1300+
auto fragments = fragmentVisitor.getFragments();
1301+
SubscriptionDefinitionVisitor subscriptionVisitor(std::move(params), std::move(callback), std::move(fragments));
1302+
1303+
peg::for_each_child<peg::operation_definition>(subscriptionVisitor.getRoot(),
1304+
[&subscriptionVisitor](const peg::ast_node& child)
1305+
{
1306+
subscriptionVisitor.visit(child);
1307+
});
1308+
1309+
auto registration = subscriptionVisitor.getRegistration();
1310+
auto key = _nextKey++;
1311+
1312+
for (const auto& name : registration.fieldNames)
1313+
{
1314+
_listeners[name].insert(key);
1315+
}
1316+
1317+
_subscriptions.emplace(key, std::move(registration));
1318+
1319+
return key;
1320+
}
1321+
1322+
void Request::unsubscribe(SubscriptionKey key)
1323+
{
1324+
auto itrSubscription = _subscriptions.find(key);
1325+
1326+
if (itrSubscription == _subscriptions.cend())
1327+
{
1328+
return;
1329+
}
1330+
1331+
for (const auto& name : itrSubscription->second.fieldNames)
1332+
{
1333+
auto itrListener = _listeners.find(name);
1334+
1335+
itrListener->second.erase(key);
1336+
if (itrListener->second.empty())
1337+
{
1338+
_listeners.erase(itrListener);
1339+
}
1340+
}
1341+
1342+
_subscriptions.erase(itrSubscription);
1343+
1344+
if (_subscriptions.empty())
1345+
{
1346+
_nextKey = 0;
1347+
}
1348+
else
1349+
{
1350+
_nextKey = _subscriptions.crbegin()->first + 1;
1351+
}
1352+
}
1353+
1354+
void Request::deliver(const SubscriptionName& name, const std::shared_ptr<Object>& subscriptionObject) const
1355+
{
1356+
auto itrListeners = _listeners.find(name);
1357+
1358+
if (itrListeners == _listeners.cend())
1359+
{
1360+
return;
1361+
}
1362+
1363+
for (const auto& key : itrListeners->second)
1364+
{
1365+
auto itrSubscription = _subscriptions.find(key);
1366+
const auto& registration = itrSubscription->second;
1367+
std::future<response::Value> result;
1368+
1369+
try
1370+
{
1371+
result = std::async(std::launch::deferred,
1372+
[](std::future<response::Value> data)
1373+
{
1374+
response::Value document(response::Type::Map);
1375+
1376+
document.emplace_back("data", data.get());
1377+
1378+
return document;
1379+
}, subscriptionObject->resolve(registration.params.state, registration.selection, registration.fragments, registration.params.variables));
1380+
}
1381+
catch (const schema_exception& ex)
1382+
{
1383+
std::promise<response::Value> promise;
1384+
response::Value document(response::Type::Map);
1385+
1386+
document.emplace_back("data", response::Value());
1387+
document.emplace_back("errors", response::Value(ex.getErrors()));
1388+
1389+
result = promise.get_future();
1390+
}
1391+
1392+
registration.callback(std::move(result));
1393+
}
1394+
}
1395+
11541396
} /* namespace service */
11551397
} /* namespace graphql */
11561398
} /* namespace facebook */

include/Today.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,13 +481,32 @@ class Subscription : public object::Subscription
481481
explicit Subscription() = default;
482482

483483
std::future<std::shared_ptr<object::Appointment>> getNextAppointmentChange(const std::shared_ptr<service::RequestState>&) const override
484+
{
485+
throw std::runtime_error("Unexpected call to getNextAppointmentChange");
486+
}
487+
};
488+
489+
class NextAppointmentChange : public object::Subscription
490+
{
491+
public:
492+
using nextAppointmentChange = std::function<std::shared_ptr<Appointment>(const std::shared_ptr<service::RequestState>&)>;
493+
494+
explicit NextAppointmentChange(nextAppointmentChange&& changeNextAppointment)
495+
: _changeNextAppointment(std::move(changeNextAppointment))
496+
{
497+
}
498+
499+
std::future<std::shared_ptr<object::Appointment>> getNextAppointmentChange(const std::shared_ptr<service::RequestState>& state) const override
484500
{
485501
std::promise<std::shared_ptr<object::Appointment>> promise;
486502

487-
promise.set_value(nullptr);
503+
promise.set_value(std::static_pointer_cast<object::Appointment>(_changeNextAppointment(state)));
488504

489505
return promise.get_future();
490506
}
507+
508+
private:
509+
nextAppointmentChange _changeNextAppointment;
491510
};
492511

493512
} /* namespace today */

0 commit comments

Comments
 (0)