@@ -1264,20 +1264,6 @@ class OperationDefinitionVisitor
1264
1264
std::optional<AwaitableResolver> _result;
1265
1265
};
1266
1266
1267
- SubscriptionData::SubscriptionData (std::shared_ptr<OperationData> data, SubscriptionName&& field,
1268
- response::Value arguments, Directives fieldDirectives, peg::ast&& query,
1269
- std::string&& operationName, SubscriptionCallback&& callback, const peg::ast_node& selection)
1270
- : data(std::move(data))
1271
- , field(std::move(field))
1272
- , arguments(std::move(arguments))
1273
- , fieldDirectives(std::move(fieldDirectives))
1274
- , query(std::move(query))
1275
- , operationName(std::move(operationName))
1276
- , callback(std::move(callback))
1277
- , selection(selection)
1278
- {
1279
- }
1280
-
1281
1267
OperationDefinitionVisitor::OperationDefinitionVisitor (ResolverContext resolverContext,
1282
1268
await_async launch, std::shared_ptr<RequestState> state, const TypeMap& operations,
1283
1269
response::Value&& variables, FragmentMap&& fragments)
@@ -1372,6 +1358,20 @@ void OperationDefinitionVisitor::visit(
1372
1358
_params->variables ));
1373
1359
}
1374
1360
1361
+ SubscriptionData::SubscriptionData (std::shared_ptr<OperationData> data, SubscriptionName&& field,
1362
+ response::Value arguments, Directives fieldDirectives, peg::ast&& query,
1363
+ std::string&& operationName, SubscriptionCallback&& callback, const peg::ast_node& selection)
1364
+ : data(std::move(data))
1365
+ , field(std::move(field))
1366
+ , arguments(std::move(arguments))
1367
+ , fieldDirectives(std::move(fieldDirectives))
1368
+ , query(std::move(query))
1369
+ , operationName(std::move(operationName))
1370
+ , callback(std::move(callback))
1371
+ , selection(selection)
1372
+ {
1373
+ }
1374
+
1375
1375
// SubscriptionDefinitionVisitor visits the AST collects the fields referenced in the subscription
1376
1376
// at the point where we create a subscription.
1377
1377
class SubscriptionDefinitionVisitor
@@ -1609,6 +1609,8 @@ std::list<schema_error> Request::validate(peg::ast& query) const
1609
1609
1610
1610
if (!query.validated )
1611
1611
{
1612
+ const std::lock_guard lock { _validationMutex };
1613
+
1612
1614
_validation->visit (*query.root );
1613
1615
errors = _validation->getStructuredErrors ();
1614
1616
query.validated = errors.empty ();
@@ -1746,14 +1748,14 @@ response::AwaitableValue Request::resolve(RequestResolveParams params) const
1746
1748
AwaitableSubscribe Request::subscribe (RequestSubscribeParams params)
1747
1749
{
1748
1750
const auto spThis = shared_from_this ();
1749
- auto launch = params. launch ;
1751
+ std::unique_lock lock { spThis-> _subscriptionMutex } ;
1750
1752
const auto key = spThis->addSubscription (std::move (params));
1751
1753
const auto itrOperation = spThis->_operations .find (strSubscription);
1752
1754
1753
1755
if (itrOperation != spThis->_operations .end ())
1754
1756
{
1755
- const auto & operation = itrOperation->second ;
1756
- const auto & registration = spThis->_subscriptions .at (key);
1757
+ const auto operation = itrOperation->second ;
1758
+ const auto registration = spThis->_subscriptions .at (key);
1757
1759
const SelectionSetParams selectionSetParams {
1758
1760
ResolverContext::NotifySubscribe,
1759
1761
registration->data ->state ,
@@ -1762,19 +1764,23 @@ AwaitableSubscribe Request::subscribe(RequestSubscribeParams params)
1762
1764
std::make_shared<FragmentSpreadDirectiveStack>(),
1763
1765
std::make_shared<FragmentSpreadDirectiveStack>(),
1764
1766
{},
1765
- launch,
1767
+ params. launch ,
1766
1768
};
1767
1769
1770
+ lock.unlock ();
1771
+
1768
1772
try
1769
1773
{
1770
- co_await launch;
1774
+ co_await params. launch ;
1771
1775
co_await operation->resolve (selectionSetParams,
1772
1776
registration->selection ,
1773
1777
registration->data ->fragments ,
1774
1778
registration->data ->variables );
1775
1779
}
1776
1780
catch (const std::exception& ex)
1777
1781
{
1782
+ lock.lock ();
1783
+
1778
1784
// Rethrow the exception, but don't leave it subscribed if the resolver failed.
1779
1785
spThis->removeSubscription (key);
1780
1786
throw ex;
@@ -1787,12 +1793,13 @@ AwaitableSubscribe Request::subscribe(RequestSubscribeParams params)
1787
1793
AwaitableUnsubscribe Request::unsubscribe (RequestUnsubscribeParams params)
1788
1794
{
1789
1795
const auto spThis = shared_from_this ();
1796
+ std::unique_lock lock { spThis->_subscriptionMutex };
1790
1797
const auto itrOperation = spThis->_operations .find (strSubscription);
1791
1798
1792
1799
if (itrOperation != spThis->_operations .end ())
1793
1800
{
1794
- const auto & operation = itrOperation->second ;
1795
- const auto & registration = spThis->_subscriptions .at (params.key );
1801
+ const auto operation = itrOperation->second ;
1802
+ const auto registration = spThis->_subscriptions .at (params.key );
1796
1803
const SelectionSetParams selectionSetParams {
1797
1804
ResolverContext::NotifyUnsubscribe,
1798
1805
registration->data ->state ,
@@ -1804,11 +1811,15 @@ AwaitableUnsubscribe Request::unsubscribe(RequestUnsubscribeParams params)
1804
1811
params.launch ,
1805
1812
};
1806
1813
1814
+ lock.unlock ();
1815
+
1807
1816
co_await params.launch ;
1808
1817
co_await operation->resolve (selectionSetParams,
1809
1818
registration->selection ,
1810
1819
registration->data ->fragments ,
1811
1820
registration->data ->variables );
1821
+
1822
+ lock.lock ();
1812
1823
}
1813
1824
1814
1825
spThis->removeSubscription (params.key );
@@ -1990,6 +2001,7 @@ std::vector<std::shared_ptr<SubscriptionData>> Request::collectRegistrations(
1990
2001
std::string_view field, RequestDeliverFilter&& filter) const noexcept
1991
2002
{
1992
2003
std::vector<std::shared_ptr<SubscriptionData>> registrations;
2004
+ const std::lock_guard lock { _subscriptionMutex };
1993
2005
const auto itrListeners = _listeners.find (field);
1994
2006
1995
2007
if (itrListeners != _listeners.end ())
0 commit comments