Skip to content

Commit 658bc81

Browse files
authored
Merge pull request #317 from wravery/next
feat(coro): update proxy client sample and improve async resolver throughput
2 parents 405df2f + be11263 commit 658bc81

16 files changed

+458
-74
lines changed

include/graphqlservice/GraphQLService.h

Lines changed: 6 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -342,40 +342,14 @@ class [[nodiscard("unnecessary construction")]] AwaitableScalar
342342
std::promise<T> _promise;
343343
};
344344

345-
[[nodiscard("unexpected call")]] bool await_ready() const noexcept
345+
[[nodiscard("unexpected call")]] constexpr bool await_ready() const noexcept
346346
{
347-
return std::visit(
348-
[](const auto& value) noexcept {
349-
using value_type = std::decay_t<decltype(value)>;
350-
351-
if constexpr (std::is_same_v<value_type, T>)
352-
{
353-
return true;
354-
}
355-
else if constexpr (std::is_same_v<value_type, std::future<T>>)
356-
{
357-
using namespace std::literals;
358-
359-
return value.wait_for(0s) != std::future_status::timeout;
360-
}
361-
else if constexpr (std::is_same_v<value_type,
362-
std::shared_ptr<const response::Value>>)
363-
{
364-
return true;
365-
}
366-
},
367-
_value);
347+
return true;
368348
}
369349

370350
void await_suspend(std::coroutine_handle<> h) const
371351
{
372-
std::thread(
373-
[this](std::coroutine_handle<> h) noexcept {
374-
std::get<std::future<T>>(_value).wait();
375-
h.resume();
376-
},
377-
std::move(h))
378-
.detach();
352+
h.resume();
379353
}
380354

381355
[[nodiscard("unnecessary construction")]] T await_resume()
@@ -472,35 +446,14 @@ class [[nodiscard("unnecessary construction")]] AwaitableObject
472446
std::promise<T> _promise;
473447
};
474448

475-
[[nodiscard("unexpected call")]] bool await_ready() const noexcept
449+
[[nodiscard("unexpected call")]] constexpr bool await_ready() const noexcept
476450
{
477-
return std::visit(
478-
[](const auto& value) noexcept {
479-
using value_type = std::decay_t<decltype(value)>;
480-
481-
if constexpr (std::is_same_v<value_type, T>)
482-
{
483-
return true;
484-
}
485-
else if constexpr (std::is_same_v<value_type, std::future<T>>)
486-
{
487-
using namespace std::literals;
488-
489-
return value.wait_for(0s) != std::future_status::timeout;
490-
}
491-
},
492-
_value);
451+
return true;
493452
}
494453

495454
void await_suspend(std::coroutine_handle<> h) const
496455
{
497-
std::thread(
498-
[this](std::coroutine_handle<> h) noexcept {
499-
std::get<std::future<T>>(_value).wait();
500-
h.resume();
501-
},
502-
std::move(h))
503-
.detach();
456+
h.resume();
504457
}
505458

506459
[[nodiscard("unnecessary construction")]] T await_resume()

samples/proxy/client.cpp

Lines changed: 121 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include "schema/ProxySchema.h"
77
#include "schema/QueryObject.h"
8+
#include "schema/ResultsObject.h"
89

910
#include "graphqlservice/JSONResponse.h"
1011

@@ -21,6 +22,7 @@
2122
#include <boost/asio/use_awaitable.hpp>
2223
#include <boost/asio/use_future.hpp>
2324

25+
#include <algorithm>
2426
#include <cstdio>
2527
#include <cstdlib>
2628
#include <functional>
@@ -45,13 +47,111 @@ constexpr auto c_port = "8080"sv;
4547
constexpr auto c_target = "/graphql"sv;
4648
constexpr int c_version = 11; // HTTP 1.1
4749

50+
struct AsyncIoWorker : service::RequestState
51+
{
52+
AsyncIoWorker()
53+
: worker { std::make_shared<service::await_worker_thread>() }
54+
{
55+
}
56+
57+
const service::await_async worker;
58+
};
59+
60+
class Results
61+
{
62+
public:
63+
explicit Results(response::Value&& data, std::vector<client::Error> errors) noexcept;
64+
65+
service::AwaitableScalar<std::optional<std::string>> getData(
66+
service::FieldParams&& fieldParams) const;
67+
service::AwaitableScalar<std::optional<std::vector<std::optional<std::string>>>> getErrors(
68+
service::FieldParams&& fieldParams) const;
69+
70+
private:
71+
mutable response::Value m_data;
72+
mutable std::vector<client::Error> m_errors;
73+
};
74+
75+
Results::Results(response::Value&& data, std::vector<client::Error> errors) noexcept
76+
: m_data { std::move(data) }
77+
, m_errors { std::move(errors) }
78+
{
79+
}
80+
81+
service::AwaitableScalar<std::optional<std::string>> Results::getData(
82+
service::FieldParams&& fieldParams) const
83+
{
84+
auto asyncIoWorker = std::static_pointer_cast<AsyncIoWorker>(fieldParams.state);
85+
auto data = std::move(m_data);
86+
87+
// Jump to a worker thread for the resolver where we can run a separate I/O context without
88+
// blocking the I/O context in Query::getRelay. This simulates how you might fan out to
89+
// additional async I/O tasks for sub-field resolvers.
90+
co_await asyncIoWorker->worker;
91+
92+
net::io_context ioc;
93+
auto future = net::co_spawn(
94+
ioc,
95+
[](response::Value&& data) -> net::awaitable<std::optional<std::string>> {
96+
co_return (data.type() == response::Type::Null)
97+
? std::nullopt
98+
: std::make_optional(response::toJSON(std::move(data)));
99+
}(std::move(data)),
100+
net::use_future);
101+
102+
ioc.run();
103+
104+
co_return future.get();
105+
}
106+
107+
service::AwaitableScalar<std::optional<std::vector<std::optional<std::string>>>> Results::getErrors(
108+
service::FieldParams&& fieldParams) const
109+
{
110+
auto asyncIoWorker = std::static_pointer_cast<AsyncIoWorker>(fieldParams.state);
111+
auto errors = std::move(m_errors);
112+
113+
// Jump to a worker thread for the resolver where we can run a separate I/O context without
114+
// blocking the I/O context in Query::getRelay. This simulates how you might fan out to
115+
// additional async I/O tasks for sub-field resolvers.
116+
co_await asyncIoWorker->worker;
117+
118+
net::io_context ioc;
119+
auto future = net::co_spawn(
120+
ioc,
121+
[](std::vector<client::Error> errors)
122+
-> net::awaitable<std::optional<std::vector<std::optional<std::string>>>> {
123+
if (errors.empty())
124+
{
125+
co_return std::nullopt;
126+
}
127+
128+
std::vector<std::optional<std::string>> results { errors.size() };
129+
130+
std::transform(errors.begin(),
131+
errors.end(),
132+
results.begin(),
133+
[](auto& error) noexcept -> std::optional<std::string> {
134+
return error.message.empty()
135+
? std::nullopt
136+
: std::make_optional<std::string>(std::move(error.message));
137+
});
138+
139+
co_return std::make_optional(results);
140+
}(std::move(errors)),
141+
net::use_future);
142+
143+
ioc.run();
144+
145+
co_return future.get();
146+
}
147+
48148
class Query
49149
{
50150
public:
51151
explicit Query(std::string_view host, std::string_view port, std::string_view target,
52152
int version) noexcept;
53153

54-
std::future<std::optional<std::string>> getRelay(std::string&& queryArg,
154+
std::future<std::shared_ptr<proxy::object::Results>> getRelay(std::string&& queryArg,
55155
std::optional<std::string>&& operationNameArg,
56156
std::optional<std::string>&& variablesArg) const;
57157

@@ -73,7 +173,7 @@ Query::Query(
73173

74174
// Based on:
75175
// https://www.boost.org/doc/libs/1_82_0/libs/beast/example/http/client/awaitable/http_client_awaitable.cpp
76-
std::future<std::optional<std::string>> Query::getRelay(std::string&& queryArg,
176+
std::future<std::shared_ptr<proxy::object::Results>> Query::getRelay(std::string&& queryArg,
77177
std::optional<std::string>&& operationNameArg, std::optional<std::string>&& variablesArg) const
78178
{
79179
response::Value payload { response::Type::Map };
@@ -99,7 +199,7 @@ std::future<std::optional<std::string>> Query::getRelay(std::string&& queryArg,
99199
const char* port,
100200
const char* target,
101201
int version,
102-
std::string requestBody) -> net::awaitable<std::optional<std::string>> {
202+
std::string requestBody) -> net::awaitable<std::shared_ptr<proxy::object::Results>> {
103203
// These objects perform our I/O. They use an executor with a default completion token
104204
// of use_awaitable. This makes our code easy, but will use exceptions as the default
105205
// error handling, i.e. if the connection drops, we might see an exception.
@@ -150,7 +250,10 @@ std::future<std::optional<std::string>> Query::getRelay(std::string&& queryArg,
150250
throw boost::system::system_error(ec, "shutdown");
151251
}
152252

153-
co_return std::make_optional<std::string>(std::move(res.body()));
253+
auto [data, errors] = client::parseServiceResponse(response::parseJSON(res.body()));
254+
255+
co_return std::make_shared<proxy::object::Results>(
256+
std::make_shared<Results>(std::move(data), std::move(errors)));
154257
}(m_host.c_str(), m_port.c_str(), m_target.c_str(), m_version, std::move(requestBody)),
155258
net::use_future);
156259

@@ -179,14 +282,25 @@ int main(int argc, char** argv)
179282
auto variables = serializeVariables(
180283
{ input, ((argc > 1) ? std::make_optional(argv[1]) : std::nullopt) });
181284
auto launch = service::await_async { std::make_shared<service::await_worker_queue>() };
285+
auto state = std::make_shared<AsyncIoWorker>();
182286
auto serviceResponse = client::parseServiceResponse(
183-
service->resolve({ query, GetOperationName(), std::move(variables), launch }).get());
287+
service->resolve({ query, GetOperationName(), std::move(variables), launch, state })
288+
.get());
184289
auto result = client::query::relayQuery::parseResponse(std::move(serviceResponse.data));
185290
auto errors = std::move(serviceResponse.errors);
186291

187-
if (result.relay)
292+
if (result.relay.data)
293+
{
294+
std::cout << "Data: " << *result.relay.data << std::endl;
295+
}
296+
297+
if (result.relay.errors)
188298
{
189-
std::cout << *result.relay << std::endl;
299+
for (const auto& message : *result.relay.errors)
300+
{
301+
std::cerr << "Remote Error: "
302+
<< (message ? std::string_view { *message } : "<empty>"sv) << std::endl;
303+
}
190304
}
191305

192306
if (!errors.empty())

samples/proxy/query/ProxyClient.cpp

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ const std::string& GetRequestText() noexcept
2626
# Licensed under the MIT License.
2727
2828
query relayQuery($query: String!, $operationName: String, $variables: String) {
29-
relay(query: $query, operationName: $operationName, variables: $variables)
29+
relay(query: $query, operationName: $operationName, variables: $variables) {
30+
data
31+
errors
32+
}
3033
}
3134
)gql"s;
3235

@@ -51,6 +54,33 @@ const peg::ast& GetRequestObject() noexcept
5154

5255
using namespace proxy;
5356

57+
template <>
58+
query::relayQuery::Response::relay_Results Response<query::relayQuery::Response::relay_Results>::parse(response::Value&& response)
59+
{
60+
query::relayQuery::Response::relay_Results result;
61+
62+
if (response.type() == response::Type::Map)
63+
{
64+
auto members = response.release<response::MapType>();
65+
66+
for (auto& member : members)
67+
{
68+
if (member.first == R"js(data)js"sv)
69+
{
70+
result.data = ModifiedResponse<std::string>::parse<TypeModifier::Nullable>(std::move(member.second));
71+
continue;
72+
}
73+
if (member.first == R"js(errors)js"sv)
74+
{
75+
result.errors = ModifiedResponse<std::string>::parse<TypeModifier::Nullable, TypeModifier::List, TypeModifier::Nullable>(std::move(member.second));
76+
continue;
77+
}
78+
}
79+
}
80+
81+
return result;
82+
}
83+
5484
namespace query::relayQuery {
5585

5686
const std::string& GetOperationName() noexcept
@@ -83,7 +113,7 @@ Response parseResponse(response::Value&& response)
83113
{
84114
if (member.first == R"js(relay)js"sv)
85115
{
86-
result.relay = ModifiedResponse<std::string>::parse<TypeModifier::Nullable>(std::move(member.second));
116+
result.relay = ModifiedResponse<query::relayQuery::Response::relay_Results>::parse(std::move(member.second));
87117
continue;
88118
}
89119
}

samples/proxy/query/ProxyClient.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ namespace graphql::client {
3232
/// # Licensed under the MIT License.
3333
///
3434
/// query relayQuery($query: String!, $operationName: String, $variables: String) {
35-
/// relay(query: $query, operationName: $operationName, variables: $variables)
35+
/// relay(query: $query, operationName: $operationName, variables: $variables) {
36+
/// data
37+
/// errors
38+
/// }
3639
/// }
3740
/// </code>
3841
namespace proxy {
@@ -64,7 +67,13 @@ struct [[nodiscard("unnecessary construction")]] Variables
6467

6568
struct [[nodiscard("unnecessary construction")]] Response
6669
{
67-
std::optional<std::string> relay {};
70+
struct [[nodiscard("unnecessary construction")]] relay_Results
71+
{
72+
std::optional<std::string> data {};
73+
std::optional<std::vector<std::optional<std::string>>> errors {};
74+
};
75+
76+
relay_Results relay {};
6877
};
6978

7079
[[nodiscard("unnecessary conversion")]] Response parseResponse(response::Value&& response);

samples/proxy/query/query.graphql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,8 @@
22
# Licensed under the MIT License.
33

44
query relayQuery($query: String!, $operationName: String, $variables: String) {
5-
relay(query: $query, operationName: $operationName, variables: $variables)
5+
relay(query: $query, operationName: $operationName, variables: $variables) {
6+
data
7+
errors
8+
}
69
}

samples/proxy/schema/ProxySchema.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,11 @@ void AddTypesToSchema(const std::shared_ptr<schema::Schema>& schema)
3535
{
3636
auto typeQuery = schema::ObjectType::Make(R"gql(Query)gql"sv, R"md()md"sv);
3737
schema->AddType(R"gql(Query)gql"sv, typeQuery);
38+
auto typeResults = schema::ObjectType::Make(R"gql(Results)gql"sv, R"md()md"sv);
39+
schema->AddType(R"gql(Results)gql"sv, typeResults);
3840

3941
AddQueryDetails(typeQuery, schema);
42+
AddResultsDetails(typeResults, schema);
4043

4144
schema->AddQueryType(typeQuery);
4245
}

samples/proxy/schema/ProxySchema.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace proxy {
2828
namespace object {
2929

3030
class Query;
31+
class Results;
3132

3233
} // namespace object
3334

@@ -50,6 +51,7 @@ class [[nodiscard("unnecessary construction")]] Operations final
5051
};
5152

5253
void AddQueryDetails(const std::shared_ptr<schema::ObjectType>& typeQuery, const std::shared_ptr<schema::Schema>& schema);
54+
void AddResultsDetails(const std::shared_ptr<schema::ObjectType>& typeResults, const std::shared_ptr<schema::Schema>& schema);
5355

5456
std::shared_ptr<schema::Schema> GetSchema();
5557

0 commit comments

Comments
 (0)