Skip to content

Commit d07fcee

Browse files
authored
Added GraphQL SSE subgraph support to fusion. (#6356)
1 parent a45436d commit d07fcee

File tree

8 files changed

+383
-29
lines changed

8 files changed

+383
-29
lines changed

src/HotChocolate/AspNetCore/src/Transport.Http/GraphQLHttpEventStreamProcessor.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ private static async IAsyncEnumerable<OperationResult> ReadMessagesPipeAsync(
116116
yield break;
117117
}
118118

119+
message.Reset();
119120
yield return operationResult;
120121
}
121122

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using System.Runtime.CompilerServices;
2+
using HotChocolate.Fusion.Metadata;
3+
using HotChocolate.Transport.Http;
4+
5+
namespace HotChocolate.Fusion.Clients;
6+
7+
internal sealed class DefaultHttpGraphQLSubscriptionClient : IGraphQLSubscriptionClient
8+
{
9+
private readonly HttpClientConfiguration _config;
10+
private readonly DefaultGraphQLHttpClient _client;
11+
12+
public DefaultHttpGraphQLSubscriptionClient(
13+
HttpClientConfiguration configuration,
14+
HttpClient httpClient)
15+
{
16+
if (httpClient == null)
17+
{
18+
throw new ArgumentNullException(nameof(httpClient));
19+
}
20+
21+
_config = configuration ?? throw new ArgumentNullException(nameof(configuration));
22+
_client = new DefaultGraphQLHttpClient(httpClient);
23+
}
24+
25+
public string SubgraphName => _config.SubgraphName;
26+
27+
public IAsyncEnumerable<GraphQLResponse> SubscribeAsync(
28+
SubgraphGraphQLRequest request,
29+
CancellationToken cancellationToken)
30+
{
31+
if (request is null)
32+
{
33+
throw new ArgumentNullException(nameof(request));
34+
}
35+
36+
return SubscribeInternalAsync(request, cancellationToken);
37+
}
38+
39+
private async IAsyncEnumerable<GraphQLResponse> SubscribeInternalAsync(
40+
SubgraphGraphQLRequest subgraphRequest,
41+
[EnumeratorCancellation] CancellationToken cancellationToken)
42+
{
43+
var request = new GraphQLHttpRequest(subgraphRequest, _config.EndpointUri);
44+
using var response = await _client.SendAsync(request, cancellationToken).ConfigureAwait(false);
45+
46+
await foreach (var result in response.ReadAsResultStreamAsync(cancellationToken).ConfigureAwait(false))
47+
{
48+
yield return new GraphQLResponse(result);
49+
}
50+
}
51+
52+
public ValueTask DisposeAsync()
53+
{
54+
_client.Dispose();
55+
return default;
56+
}
57+
}

src/HotChocolate/Fusion/src/Core/Clients/DefaultWebSocketGraphQLSubscriptionClientFactory.cs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,33 @@ namespace HotChocolate.Fusion.Clients;
66
internal sealed class DefaultWebSocketGraphQLSubscriptionClientFactory
77
: IGraphQLSubscriptionClientFactory
88
{
9+
private readonly IHttpClientFactory _httpClientFactory;
910
private readonly IWebSocketConnectionFactory _connectionFactory;
1011

1112
public DefaultWebSocketGraphQLSubscriptionClientFactory(
13+
IHttpClientFactory httpClientFactory,
1214
IWebSocketConnectionFactory connectionFactory)
1315
{
16+
_httpClientFactory = httpClientFactory ??
17+
throw new ArgumentNullException(nameof(httpClientFactory));
1418
_connectionFactory = connectionFactory ??
1519
throw new ArgumentNullException(nameof(connectionFactory));
1620
}
1721

1822
public IGraphQLSubscriptionClient CreateClient(IGraphQLClientConfiguration configuration)
1923
{
20-
if (configuration is not WebSocketClientConfiguration webSocketClientConfig)
24+
if (configuration is WebSocketClientConfiguration webSocketClientConfig)
2125
{
22-
throw new ArgumentException(TransportConfigurationNotSupported, nameof(configuration));
26+
var connection = _connectionFactory.CreateConnection(configuration.ClientName);
27+
return new DefaultWebSocketGraphQLSubscriptionClient(webSocketClientConfig, connection);
2328
}
2429

25-
var connection = _connectionFactory.CreateConnection(configuration.ClientName);
26-
return new DefaultWebSocketGraphQLSubscriptionClient(webSocketClientConfig, connection);
30+
if (configuration is HttpClientConfiguration httpClientConfig)
31+
{
32+
var httpClient = _httpClientFactory.CreateClient(httpClientConfig.ClientName);
33+
return new DefaultHttpGraphQLSubscriptionClient(httpClientConfig, httpClient);
34+
}
35+
36+
throw new ArgumentException(TransportConfigurationNotSupported, nameof(configuration));
2737
}
28-
}
38+
}

src/HotChocolate/Fusion/src/Core/DependencyInjection/FusionRequestExecutorBuilderExtensions.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public static FusionGatewayBuilder AddFusionGatewayServer(
4747
sp.GetRequiredService<IHttpClientFactory>()));
4848
services.TryAddSingleton<IGraphQLSubscriptionClientFactory>(
4949
sp => new DefaultWebSocketGraphQLSubscriptionClientFactory(
50+
sp.GetRequiredService<IHttpClientFactory>(),
5051
sp.GetRequiredService<IWebSocketConnectionFactory>()));
5152

5253
var builder = services
@@ -385,6 +386,14 @@ IGraphQLSubscriptionClient Create(IGraphQLClientConfiguration clientConfig)
385386
{
386387
map2.Add(config.SubgraphName, () => Create(config));
387388
}
389+
390+
foreach (var config in fusionGraphConfig.HttpClients)
391+
{
392+
if (!map2.ContainsKey(config.SubgraphName))
393+
{
394+
map2.Add(config.SubgraphName, () => Create(config));
395+
}
396+
}
388397
}
389398

390399
return new GraphQLClientFactory(map1, map2);

src/HotChocolate/Fusion/test/Core.Tests/DemoIntegrationTests.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
using CookieCrumble;
22
using HotChocolate.Execution;
3-
using HotChocolate.Execution.Configuration;
43
using HotChocolate.Fusion.Composition;
54
using HotChocolate.Fusion.Composition.Features;
6-
using HotChocolate.Fusion.Planning;
75
using HotChocolate.Fusion.Shared;
8-
using HotChocolate.Language;
96
using HotChocolate.Skimmed.Serialization;
107
using HotChocolate.Types.Relay;
118
using Microsoft.Extensions.DependencyInjection;
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
using CookieCrumble;
2+
using HotChocolate.Execution;
3+
using HotChocolate.Execution.Configuration;
4+
using HotChocolate.Fusion.Clients;
5+
using HotChocolate.Fusion.Composition;
6+
using HotChocolate.Fusion.Composition.Features;
7+
using HotChocolate.Fusion.Planning;
8+
using HotChocolate.Fusion.Shared;
9+
using HotChocolate.Language;
10+
using HotChocolate.Skimmed.Serialization;
11+
using HotChocolate.Types.Relay;
12+
using Microsoft.Extensions.DependencyInjection;
13+
using Xunit.Abstractions;
14+
using static HotChocolate.Fusion.Shared.DemoProjectSchemaExtensions;
15+
using static HotChocolate.Language.Utf8GraphQLParser;
16+
using static HotChocolate.Fusion.TestHelper;
17+
18+
namespace HotChocolate.Fusion;
19+
20+
public class EventStreamTests
21+
{
22+
private readonly Func<ICompositionLog> _logFactory;
23+
24+
public EventStreamTests(ITestOutputHelper output)
25+
{
26+
_logFactory = () => new TestCompositionLog(output);
27+
}
28+
29+
[Fact]
30+
public async Task Authors_And_Reviews_Subscription_OnNewReview()
31+
{
32+
// arrange
33+
using var cts = new CancellationTokenSource(10_000);
34+
using var demoProject = await DemoProject.CreateAsync(cts.Token);
35+
36+
// act
37+
var fusionGraph = await new FusionGraphComposer(logFactory: _logFactory).ComposeAsync(
38+
new[]
39+
{
40+
demoProject.Reviews2.ToConfiguration(Reviews2ExtensionSdl, onlyHttp: true),
41+
demoProject.Accounts.ToConfiguration(AccountsExtensionSdl, onlyHttp: true)
42+
},
43+
default,
44+
cts.Token);
45+
46+
var executor = await new ServiceCollection()
47+
.AddSingleton(demoProject.HttpClientFactory)
48+
.AddSingleton<IWebSocketConnectionFactory>(new NoWebSockets())
49+
.AddFusionGatewayServer()
50+
.ConfigureFromDocument(SchemaFormatter.FormatAsDocument(fusionGraph))
51+
.BuildRequestExecutorAsync(cancellationToken: cts.Token);
52+
53+
var request = Parse(
54+
"""
55+
subscription OnNewReview {
56+
onNewReview {
57+
body
58+
author {
59+
name
60+
}
61+
}
62+
}
63+
""");
64+
65+
// act
66+
var result = await executor.ExecuteAsync(
67+
QueryRequestBuilder
68+
.New()
69+
.SetQuery(request)
70+
.Create(),
71+
cts.Token);
72+
73+
// assert
74+
var snapshot = new Snapshot();
75+
await CollectStreamSnapshotData(snapshot, request, result, fusionGraph, cts.Token);
76+
await snapshot.MatchAsync(cts.Token);
77+
}
78+
79+
private sealed class NoWebSockets : IWebSocketConnectionFactory
80+
{
81+
public IWebSocketConnection CreateConnection(string name)
82+
{
83+
throw new NotSupportedException();
84+
}
85+
}
86+
}

0 commit comments

Comments
 (0)