Skip to content

Commit 57dc534

Browse files
committed
deduplicate redis hub lifetime group msges
1 parent a5b88af commit 57dc534

File tree

3 files changed

+33
-2
lines changed

3 files changed

+33
-2
lines changed

src/SignalR/server/Core/src/DefaultHubLifetimeManager.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ public override Task SendGroupAsync(string groupName, string methodName, object?
225225
public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object?[] args, CancellationToken cancellationToken = default)
226226
{
227227
// Each task represents the list of tasks for each of the writes within a group
228+
List<Task>? tasks = new List<Task>();
229+
228230
var connections = new ConcurrentDictionary<string, HubConnectionContext>();
229231

230232
foreach (var groupName in groupNames)
@@ -246,7 +248,12 @@ public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string me
246248

247249
foreach (var connection in connections)
248250
{
249-
SendConnectionAsync(connection.Key, methodName, args, cancellationToken);
251+
tasks.Add(SendConnectionAsync(connection.Key, methodName, args, cancellationToken));
252+
}
253+
254+
if (tasks != null)
255+
{
256+
return Task.WhenAll(tasks);
250257
}
251258

252259
return Task.CompletedTask;

src/SignalR/server/StackExchangeRedis/src/Internal/RedisSubscriptionManager.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,14 @@ public async Task RemoveSubscriptionAsync(string id, HubConnectionContext connec
6363
_lock.Release();
6464
}
6565
}
66+
67+
public HubConnectionStore? GetStore(string id)
68+
{
69+
if (_subscriptions.TryGetValue(id, out var store))
70+
{
71+
return store;
72+
}
73+
74+
return null;
75+
}
6676
}

src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,15 +259,29 @@ public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string me
259259
ArgumentNullException.ThrowIfNull(groupNames);
260260
var publishTasks = new List<Task>(groupNames.Count);
261261
var payload = _protocol.WriteInvocation(methodName, args);
262+
var connections = new HashSet<string>();
262263

263264
foreach (var groupName in groupNames)
264265
{
265266
if (!string.IsNullOrEmpty(groupName))
266267
{
267-
publishTasks.Add(PublishAsync(_channels.Group(groupName), payload));
268+
var connectionStore = _groups.GetStore(groupName);
269+
if (connectionStore != null)
270+
{
271+
foreach (var connection in connectionStore)
272+
{
273+
connections.Add(connection.ConnectionId);
274+
}
275+
}
268276
}
269277
}
270278

279+
// Send to unique connections
280+
foreach (var connectionId in connections)
281+
{
282+
publishTasks.Add(PublishAsync(_channels.Connection(connectionId), payload));
283+
}
284+
271285
return Task.WhenAll(publishTasks);
272286
}
273287

0 commit comments

Comments
 (0)