Skip to content

Commit 17f505d

Browse files
authored
Unsubscribe from redis topic async (#8200)
1 parent 4140e36 commit 17f505d

File tree

5 files changed

+170
-15
lines changed

5 files changed

+170
-15
lines changed

src/HotChocolate/Core/src/Subscriptions.Redis/RedisTopic.cs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ protected override async ValueTask<IDisposable> OnConnectAsync(
3737
return new Session(Name, _connection, DiagnosticEvents);
3838
}
3939

40-
private sealed class Session : IDisposable
40+
private sealed class Session : IDisposable, IAsyncDisposable
4141
{
4242
private readonly string _name;
4343
private readonly IConnectionMultiplexer _connection;
@@ -56,12 +56,26 @@ public Session(
5656

5757
public void Dispose()
5858
{
59-
if (!_disposed)
59+
if (_disposed)
6060
{
61-
_connection.GetSubscriber().Unsubscribe(_name);
62-
_diagnosticEvents.ProviderTopicInfo(_name, RedisTopic_UnsubscribedFromRedis);
63-
_disposed = true;
61+
return;
6462
}
63+
64+
_connection.GetSubscriber().Unsubscribe(_name);
65+
_diagnosticEvents.ProviderTopicInfo(_name, RedisTopic_UnsubscribedFromRedis);
66+
_disposed = true;
67+
}
68+
69+
public async ValueTask DisposeAsync()
70+
{
71+
if (_disposed)
72+
{
73+
return;
74+
}
75+
76+
await _connection.GetSubscriber().UnsubscribeAsync(_name);
77+
_diagnosticEvents.ProviderTopicInfo(_name, RedisTopic_UnsubscribedFromRedis);
78+
_disposed = true;
6579
}
6680
}
6781
}

src/HotChocolate/Core/src/Subscriptions/DefaultTopic.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,15 @@ private async Task ProcessMessagesSessionAsync(IDisposable session)
208208
}
209209
finally
210210
{
211-
session.Dispose();
211+
if (session is IAsyncDisposable asyncDisposableSession)
212+
{
213+
await asyncDisposableSession.DisposeAsync().ConfigureAwait(false);
214+
}
215+
else
216+
{
217+
session.Dispose();
218+
}
219+
212220
DiagnosticEvents.Disconnected(Name);
213221
}
214222
}

src/HotChocolate/Core/test/Subscriptions.Redis.Tests/RedisIntegrationTests.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
using HotChocolate.Execution;
12
using HotChocolate.Execution.Configuration;
23
using Microsoft.Extensions.DependencyInjection;
34
using Squadron;
5+
using StackExchange.Redis;
46
using Xunit.Abstractions;
57

68
namespace HotChocolate.Subscriptions.Redis;
@@ -47,6 +49,45 @@ public override Task Subscribe_And_Complete_Topic()
4749
public override Task Subscribe_And_Complete_Topic_With_ValueTypeMessage()
4850
=> base.Subscribe_And_Complete_Topic_With_ValueTypeMessage();
4951

52+
[Fact]
53+
public async Task Unsubscribe_Should_RemoveChannel()
54+
{
55+
using var cts = new CancellationTokenSource(Timeout);
56+
await using var services = CreateServer<Subscription>();
57+
58+
var result = await services.ExecuteRequestAsync(
59+
"subscription { onMessage }",
60+
cancellationToken: cts.Token);
61+
62+
var activeChannelsAfterSubscribe = await GetActiveChannelsAsync();
63+
64+
await result.DisposeAsync();
65+
66+
var channelRemovedEvent = new ManualResetEventSlim(false);
67+
68+
_ = Task.Run(async () =>
69+
{
70+
while (!cts.Token.IsCancellationRequested)
71+
{
72+
var activeChannels = await GetActiveChannelsAsync();
73+
if (activeChannels.Length < activeChannelsAfterSubscribe.Length)
74+
{
75+
channelRemovedEvent.Set();
76+
break;
77+
}
78+
79+
await Task.Delay(100, cts.Token);
80+
}
81+
}, cts.Token);
82+
83+
channelRemovedEvent.Wait(cts.Token);
84+
}
85+
86+
private async Task<RedisResult[]> GetActiveChannelsAsync()
87+
{
88+
return (RedisResult[])(await _redisResource.GetConnection().GetDatabase().ExecuteAsync("PUBSUB", "CHANNELS"))!;
89+
}
90+
5091
protected override void ConfigurePubSub(IRequestExecutorBuilder graphqlBuilder)
5192
=> graphqlBuilder.AddRedisSubscriptions(_ => _redisResource.GetConnection());
5293
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
using HotChocolate.Subscriptions.Diagnostics;
2+
using HotChocolate.Tests;
3+
using Xunit.Abstractions;
4+
5+
namespace HotChocolate.Subscriptions;
6+
7+
public class DefaultTopicTests(ITestOutputHelper outputHelper)
8+
{
9+
[Fact]
10+
public async Task Unsubscribe_ForAsyncDisposableSession_DisposesAsync()
11+
{
12+
var sessionMock = new StubAsyncDisposableSession();
13+
var pubSub = new NoOpPubSub(sessionMock, new SubscriptionTestDiagnostics(outputHelper));
14+
15+
var sourceStream = await pubSub.SubscribeAsync<string>("topic");
16+
await sourceStream.DisposeAsync();
17+
18+
SpinWait.SpinUntil(() => sessionMock.AsyncDisposableCalled, TimeSpan.FromSeconds(5));
19+
Assert.False(sessionMock.DisposableCalled);
20+
}
21+
22+
[Fact]
23+
public async Task Unsubscribe_ForSyncDisposableSession_DisposesSync()
24+
{
25+
var sessionMock = new StubDisposableSession();
26+
var pubSub = new NoOpPubSub(sessionMock, new SubscriptionTestDiagnostics(outputHelper));
27+
28+
var sourceStream = await pubSub.SubscribeAsync<string>("topic");
29+
await sourceStream.DisposeAsync();
30+
31+
SpinWait.SpinUntil(() => sessionMock.DisposableCalled, TimeSpan.FromSeconds(5));
32+
}
33+
34+
private sealed class NoOpPubSub(IDisposable session, ISubscriptionDiagnosticEvents diagnosticEvents)
35+
: DefaultPubSub(new SubscriptionOptions(), diagnosticEvents)
36+
{
37+
protected override ValueTask OnSendAsync<TMessage>(string formattedTopic, TMessage message, CancellationToken cancellationToken = default)
38+
{
39+
return ValueTask.CompletedTask;
40+
}
41+
42+
protected override ValueTask OnCompleteAsync(string formattedTopic)
43+
{
44+
return ValueTask.CompletedTask;
45+
}
46+
47+
protected override DefaultTopic<TMessage> OnCreateTopic<TMessage>(string formattedTopic, int? bufferCapacity, TopicBufferFullMode? bufferFullMode)
48+
{
49+
return new AsyncDisposableTopic<TMessage>(
50+
formattedTopic,
51+
bufferCapacity ?? 1,
52+
bufferFullMode ?? TopicBufferFullMode.DropOldest,
53+
DiagnosticEvents,
54+
session);
55+
}
56+
57+
private sealed class AsyncDisposableTopic<TMessage>(
58+
string name,
59+
int capacity,
60+
TopicBufferFullMode fullMode,
61+
ISubscriptionDiagnosticEvents diagnosticEvents,
62+
IDisposable session)
63+
: DefaultTopic<TMessage>(name, capacity, fullMode, diagnosticEvents)
64+
{
65+
protected override ValueTask<IDisposable> OnConnectAsync(CancellationToken cancellationToken)
66+
{
67+
return ValueTask.FromResult<IDisposable>(session);
68+
}
69+
}
70+
}
71+
72+
private class StubDisposableSession : IDisposable
73+
{
74+
public bool DisposableCalled { get; private set; } = false;
75+
76+
public void Dispose()
77+
{
78+
DisposableCalled = true;
79+
}
80+
}
81+
82+
private class StubAsyncDisposableSession : StubDisposableSession, IAsyncDisposable
83+
{
84+
public bool AsyncDisposableCalled { get; private set; } = false;
85+
86+
public ValueTask DisposeAsync()
87+
{
88+
AsyncDisposableCalled = true;
89+
return ValueTask.CompletedTask;
90+
}
91+
}
92+
}

src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace HotChocolate.Subscriptions;
1010

1111
public abstract class SubscriptionIntegrationTestBase
1212
{
13-
private static readonly int _timeout = Debugger.IsAttached ? 1000000 : 5000;
13+
protected static readonly int Timeout = Debugger.IsAttached ? 1000000 : 5000;
1414
private readonly ITestOutputHelper _output;
1515

1616
protected SubscriptionIntegrationTestBase(ITestOutputHelper output)
@@ -22,7 +22,7 @@ protected SubscriptionIntegrationTestBase(ITestOutputHelper output)
2222
public virtual async Task Subscribe_Infer_Topic()
2323
{
2424
// arrange
25-
using var cts = new CancellationTokenSource(_timeout);
25+
using var cts = new CancellationTokenSource(Timeout);
2626
await using var services = CreateServer<Subscription>();
2727
var sender = services.GetRequiredService<ITopicEventSender>();
2828

@@ -58,7 +58,7 @@ public virtual async Task Subscribe_Infer_Topic()
5858
public virtual async Task Subscribe_Static_Topic()
5959
{
6060
// arrange
61-
using var cts = new CancellationTokenSource(_timeout);
61+
using var cts = new CancellationTokenSource(Timeout);
6262
await using var services = CreateServer<Subscription2>();
6363
var sender = services.GetRequiredService<ITopicEventSender>();
6464

@@ -96,7 +96,7 @@ public virtual async Task Subscribe_Static_Topic()
9696
public virtual async Task Subscribe_Topic_With_Arguments()
9797
{
9898
// arrange
99-
using var cts = new CancellationTokenSource(_timeout);
99+
using var cts = new CancellationTokenSource(Timeout);
100100
await using var services = CreateServer<Subscription3>();
101101
var sender = services.GetRequiredService<ITopicEventSender>();
102102

@@ -132,7 +132,7 @@ public virtual async Task Subscribe_Topic_With_Arguments()
132132
public virtual async Task Subscribe_Topic_With_Arguments_2_Subscriber()
133133
{
134134
// arrange
135-
using var cts = new CancellationTokenSource(_timeout);
135+
using var cts = new CancellationTokenSource(Timeout);
136136
await using var services = CreateServer<Subscription3>();
137137
var sender = services.GetRequiredService<ITopicEventSender>();
138138

@@ -193,7 +193,7 @@ From Stream 2
193193
public virtual async Task Subscribe_Topic_With_Arguments_2_Topics()
194194
{
195195
// arrange
196-
using var cts = new CancellationTokenSource(_timeout);
196+
using var cts = new CancellationTokenSource(Timeout);
197197
await using var services = CreateServer<Subscription3>();
198198
var sender = services.GetRequiredService<ITopicEventSender>();
199199

@@ -257,7 +257,7 @@ From Stream 2
257257
public virtual async Task Subscribe_Topic_With_2_Arguments()
258258
{
259259
// arrange
260-
using var cts = new CancellationTokenSource(_timeout);
260+
using var cts = new CancellationTokenSource(Timeout);
261261
await using var services = CreateServer<Subscription3>();
262262
var sender = services.GetRequiredService<ITopicEventSender>();
263263

@@ -293,7 +293,7 @@ public virtual async Task Subscribe_Topic_With_2_Arguments()
293293
public virtual async Task Subscribe_And_Complete_Topic()
294294
{
295295
// arrange
296-
using var cts = new CancellationTokenSource(_timeout);
296+
using var cts = new CancellationTokenSource(Timeout);
297297
await using var services = CreateServer<Subscription2>();
298298
var sender = services.GetRequiredService<ITopicEventSender>();
299299

@@ -320,7 +320,7 @@ public virtual async Task Subscribe_And_Complete_Topic()
320320
public virtual async Task Subscribe_And_Complete_Topic_With_ValueTypeMessage()
321321
{
322322
// arrange
323-
using var cts = new CancellationTokenSource(_timeout);
323+
using var cts = new CancellationTokenSource(Timeout);
324324
await using var services = CreateServer<Subscription3>();
325325
var sender = services.GetRequiredService<ITopicEventSender>();
326326

0 commit comments

Comments
 (0)