Skip to content

Commit 519f495

Browse files
authored
Simplified Subscription Provider (#6377)
1 parent a484c30 commit 519f495

File tree

13 files changed

+250
-451
lines changed

13 files changed

+250
-451
lines changed

src/HotChocolate/AspNetCore/src/AspNetCore/Serialization/DefaultHttpResponseFormatter.cs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,14 @@ public DefaultHttpResponseFormatter(
4747
: this(
4848
new HttpResponseFormatterOptions
4949
{
50-
Json = new JsonResultFormatterOptions { Indented = indented, Encoder = encoder }
51-
}) { }
50+
Json = new JsonResultFormatterOptions
51+
{
52+
Indented = indented,
53+
Encoder = encoder
54+
}
55+
})
56+
{
57+
}
5258

5359
/// <summary>
5460
/// Creates a new instance of <see cref="DefaultHttpResponseFormatter" />.
@@ -168,7 +174,7 @@ public async ValueTask FormatAsync(
168174
#if NET6_0_OR_GREATER
169175
response.Headers.CacheControl = cacheControlHeaderValue;
170176
#else
171-
response.Headers.Add(HeaderNames.CacheControl, cacheControlHeaderValue);
177+
response.Headers[HttpHeaderKeys.CacheControl] = cacheControlHeaderValue;
172178
#endif
173179
}
174180

@@ -183,8 +189,13 @@ public async ValueTask FormatAsync(
183189

184190
response.ContentType = format.ContentType;
185191
response.StatusCode = statusCode;
186-
187-
response.Headers.Add(HttpHeaderKeys.CacheControl, HttpHeaderValues.NoCache);
192+
193+
#if NET6_0_OR_GREATER
194+
response.Headers.CacheControl = HttpHeaderValues.NoCache;
195+
#else
196+
response.Headers[HttpHeaderKeys.CacheControl] = HttpHeaderValues.NoCache;
197+
#endif
198+
188199
OnWriteResponseHeaders(responseStream, format, response.Headers);
189200

190201
await response.Body.FlushAsync(cancellationToken);

src/HotChocolate/AspNetCore/test/AspNetCore.Tests/Subscriptions/GraphQLOverWebSocket/WebSocketProtocolTests.cs

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Diagnostics;
12
using CookieCrumble;
23
using HotChocolate.AspNetCore.Subscriptions.Protocols;
34
using HotChocolate.AspNetCore.Subscriptions.Protocols.GraphQLOverWebSocket;
@@ -112,8 +113,7 @@ public Task No_ConnectionInit_Timeout()
112113
{
113114
Sockets =
114115
{
115-
ConnectionInitializationTimeout =
116-
TimeSpan.FromMilliseconds(50),
116+
ConnectionInitializationTimeout = TimeSpan.FromMilliseconds(50),
117117
KeepAliveInterval = TimeSpan.FromMilliseconds(150)
118118
}
119119
}));
@@ -246,7 +246,14 @@ public Task Subscribe_ReceiveDataOnMutation()
246246
async ct =>
247247
{
248248
// arrange
249-
using var testServer = CreateStarWarsServer(output: _output);
249+
var diagnostics = new SubscriptionTestDiagnostics();
250+
using var testServer = CreateStarWarsServer(
251+
configureServices: c =>
252+
{
253+
c.AddGraphQL()
254+
.AddDiagnosticEventListener(_ => diagnostics);
255+
},
256+
output: _output);
250257
var client = CreateWebSocketClient(testServer);
251258
using var webSocket = await ConnectToServerAsync(client, ct);
252259

@@ -257,18 +264,25 @@ public Task Subscribe_ReceiveDataOnMutation()
257264
// act
258265
await webSocket.SendSubscribeAsync(subscriptionId, payload, ct);
259266

267+
while (diagnostics.Subscribed is not 1)
268+
{
269+
await Task.Delay(10, ct);
270+
}
271+
260272
await testServer.SendPostRequestAsync(
261273
new ClientQueryRequest
262274
{
263-
Query = @"
264-
mutation {
265-
createReview(episode: NEW_HOPE review: {
266-
commentary: ""foo""
267-
stars: 5
268-
}) {
269-
stars
275+
Query =
276+
"""
277+
mutation {
278+
createReview(episode: NEW_HOPE review: {
279+
commentary: "foo"
280+
stars: 5
281+
}) {
282+
stars
283+
}
270284
}
271-
}"
285+
"""
272286
});
273287

274288
// assert
@@ -506,11 +520,20 @@ await TryTest(
506520
var payload = new SubscribePayload(
507521
"subscription { onReview(episode: NEW_HOPE) { stars } }");
508522

523+
var stopwatch = Stopwatch.StartNew();
524+
509525
for (var i = 0; i < 600; i++)
510526
{
511527
await webSocket.SendSubscribeAsync(i.ToString(), payload, ct);
512528
}
513-
529+
530+
while(diagnostics.Subscribed < 600)
531+
{
532+
await Task.Delay(10, ct);
533+
}
534+
535+
_output.WriteLine($"Subscribed in {stopwatch.ElapsedMilliseconds}ms");
536+
514537
await testServer.SendPostRequestAsync(
515538
new ClientQueryRequest
516539
{
@@ -534,10 +557,7 @@ await testServer.SendPostRequestAsync(
534557
await testServer.SendPostRequestAsync(
535558
new ClientQueryRequest
536559
{
537-
Query = @"
538-
mutation {
539-
complete(episode:NEW_HOPE)
540-
}"
560+
Query = @"mutation { complete(episode:NEW_HOPE) }"
541561
});
542562

543563
// assert
@@ -936,10 +956,19 @@ public override ValueTask OnPongAsync(
936956

937957
public sealed class SubscriptionTestDiagnostics : SubscriptionDiagnosticEventsListener
938958
{
959+
private int _subscribed;
960+
961+
public int Subscribed => _subscribed;
962+
939963
public bool UnsubscribeInvoked { get; private set; }
940964

941965
public bool CloseInvoked { get; private set; }
942966

967+
public override void SubscribeSuccess(string topicName)
968+
{
969+
Interlocked.Increment(ref _subscribed);
970+
}
971+
943972
public override void Unsubscribe(string topicName, int shard, int subscribers)
944973
=> UnsubscribeInvoked = true;
945974

src/HotChocolate/Core/src/Subscriptions.InMemory/InMemoryPubSub.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,24 @@ public InMemoryPubSub(
1919
_diagnosticEvents = diagnosticEvents;
2020
}
2121

22-
protected override async ValueTask OnSendAsync<TMessage>(
22+
protected override ValueTask OnSendAsync<TMessage>(
2323
string formattedTopic,
2424
TMessage message,
2525
CancellationToken cancellationToken = default)
2626
{
2727
if (TryGetTopic<TMessage>(formattedTopic, out var topic))
2828
{
29-
await topic.PublishAsync(message, cancellationToken).ConfigureAwait(false);
29+
topic.Publish(message);
3030
}
31+
32+
return ValueTask.CompletedTask;
3133
}
3234

3335
protected override ValueTask OnCompleteAsync(string formattedTopic)
3436
{
3537
if (TryGetTopic(formattedTopic, out var topic))
3638
{
37-
topic.TryComplete();
39+
topic.Complete();
3840
}
3941

4042
return ValueTask.CompletedTask;

src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,37 +31,14 @@ protected override async ValueTask<IDisposable> OnConnectAsync(
3131
Debug.Assert(_connection != null, "_serializer != null");
3232

3333
var natsSession = await _connection
34-
.SubscribeAsync(
35-
Name,
36-
async (string? m) => await Dispatch(m, cancellationToken).ConfigureAwait(false))
34+
.SubscribeAsync(Name, (string? m) => DispatchMessage(_serializer, m))
3735
.ConfigureAwait(false);
3836

3937
DiagnosticEvents.ProviderTopicInfo(Name, NatsTopic_ConnectAsync_SubscribedToNats);
4038

4139
return new Session(Name, natsSession, DiagnosticEvents);
4240
}
4341

44-
private async ValueTask Dispatch(
45-
string? serializedMessage,
46-
CancellationToken cancellationToken)
47-
{
48-
// we ensure that if there is noise on the channel we filter it out.
49-
if (!string.IsNullOrEmpty(serializedMessage))
50-
{
51-
DiagnosticEvents.Received(Name, serializedMessage);
52-
var envelope = _serializer.Deserialize<TMessage>(serializedMessage);
53-
54-
if (envelope.Kind is MessageKind.Completed)
55-
{
56-
TryComplete();
57-
}
58-
else if(envelope.Body is { } body)
59-
{
60-
await PublishAsync(body, cancellationToken);
61-
}
62-
}
63-
}
64-
6542
private sealed class Session : IDisposable
6643
{
6744
private readonly string _name;

src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQConnectionFailedException.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,14 @@ public RabbitMQConnectionFailedException(int connectionAttempts)
1313
{
1414
}
1515

16-
public RabbitMQConnectionFailedException(SerializationInfo info, StreamingContext context) : base(info, context)
16+
#if NET8_0_OR_GREATER
17+
[Obsolete(
18+
"This API supports obsolete formatter-based serialization. " +
19+
"It should not be called or extended by application code.",
20+
true)]
21+
#endif
22+
public RabbitMQConnectionFailedException(SerializationInfo info, StreamingContext context)
23+
: base(info, context)
1724
{
1825
}
1926
}

src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopic.cs

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System.Diagnostics;
22
using System.Text;
3-
using System.Threading.Channels;
43
using RabbitMQ.Client;
54
using HotChocolate.Subscriptions.Diagnostics;
65
using RabbitMQ.Client.Events;
@@ -37,17 +36,19 @@ protected override async ValueTask<IDisposable> OnConnectAsync(
3736
var queueName = Guid.NewGuid().ToString();
3837
var consumer = CreateConsumer(channel, queueName);
3938

40-
async Task Received(object sender, BasicDeliverEventArgs args)
39+
Task Received(object sender, BasicDeliverEventArgs args)
4140
{
4241
try
4342
{
4443
var serializedMessage = Encoding.UTF8.GetString(args.Body.Span);
45-
await DispatchAsync(serializedMessage, cancellationToken).ConfigureAwait(false);
44+
DispatchMessage(_serializer, serializedMessage);
4645
}
4746
finally
4847
{
4948
channel.BasicAck(args.DeliveryTag, false);
5049
}
50+
51+
return Task.CompletedTask;
5152
}
5253

5354
consumer.Received += Received;
@@ -64,28 +65,6 @@ async Task Received(object sender, BasicDeliverEventArgs args)
6465
});
6566
}
6667

67-
private async ValueTask DispatchAsync(
68-
string serializedMessage,
69-
CancellationToken cancellationToken)
70-
{
71-
// we ensure that if there is noise on the channel we filter it out.
72-
if (!string.IsNullOrEmpty(serializedMessage))
73-
{
74-
DiagnosticEvents.Received(Name, serializedMessage);
75-
76-
var envelope = _serializer.Deserialize<TMessage>(serializedMessage);
77-
78-
if (envelope.Kind is MessageKind.Completed)
79-
{
80-
TryComplete();
81-
}
82-
else if (envelope.Body is { } body)
83-
{
84-
await PublishAsync(body, cancellationToken).ConfigureAwait(false);
85-
}
86-
}
87-
}
88-
8968
private AsyncEventingBasicConsumer CreateConsumer(IModel channel, string queueName)
9069
{
9170
channel.ExchangeDeclare(

src/HotChocolate/Core/src/Subscriptions.Redis/RedisTopic.cs

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Diagnostics;
2-
using System.Threading.Channels;
32
using HotChocolate.Subscriptions.Diagnostics;
43
using StackExchange.Redis;
54
using static HotChocolate.Subscriptions.Redis.Properties.Resources;
@@ -34,38 +33,10 @@ protected override async ValueTask<IDisposable> OnConnectAsync(
3433
var subscriber = _connection.GetSubscriber();
3534
var messageQueue = await subscriber.SubscribeAsync(Name).ConfigureAwait(false);
3635
DiagnosticEvents.ProviderTopicInfo(Name, RedisTopic_SubscribedToRedis);
37-
38-
messageQueue.OnMessage(
39-
async redisMessage =>
40-
{
41-
await DispatchAsync(redisMessage.Message.ToString(), cancellationToken)
42-
.ConfigureAwait(false);
43-
});
44-
36+
messageQueue.OnMessage(redisMessage => DispatchMessage(_serializer, redisMessage.Message.ToString()));
4537
return new Session(Name, _connection, DiagnosticEvents);
4638
}
4739

48-
private async ValueTask DispatchAsync(
49-
string? serializedMessage,
50-
CancellationToken cancellationToken)
51-
{
52-
// we ensure that if there is noise on the channel we filter it out.
53-
if (!string.IsNullOrEmpty(serializedMessage))
54-
{
55-
DiagnosticEvents.Received(Name, serializedMessage);
56-
var envelope = _serializer.Deserialize<TMessage>(serializedMessage);
57-
58-
if (envelope.Kind is MessageKind.Completed)
59-
{
60-
TryComplete();
61-
}
62-
else if (envelope.Body is { } body)
63-
{
64-
await PublishAsync(body, cancellationToken).ConfigureAwait(false);
65-
}
66-
}
67-
}
68-
6940
private sealed class Session : IDisposable
7041
{
7142
private readonly string _name;

0 commit comments

Comments
 (0)