From c393463b45713865f31f644f585292930cce49cf Mon Sep 17 00:00:00 2001 From: Damiaan Twelker Date: Wed, 7 May 2025 09:07:51 +0200 Subject: [PATCH] deduplicate group messages across connections --- .../Core/src/DefaultHubLifetimeManager.cs | 23 ++++++++++++++-- .../DefaultHubLifetimeManagerTests.cs | 19 +++++++++++++ .../src/Internal/RedisSubscriptionManager.cs | 10 +++++++ .../src/RedisHubLifetimeManager.cs | 27 +++++++++++++++---- .../test/RedisHubLifetimeManagerTests.cs | 24 +++++++++++++++++ 5 files changed, 96 insertions(+), 7 deletions(-) diff --git a/src/SignalR/server/Core/src/DefaultHubLifetimeManager.cs b/src/SignalR/server/Core/src/DefaultHubLifetimeManager.cs index 1eba4726f893..2ea5bfccaa94 100644 --- a/src/SignalR/server/Core/src/DefaultHubLifetimeManager.cs +++ b/src/SignalR/server/Core/src/DefaultHubLifetimeManager.cs @@ -226,7 +226,7 @@ public override Task SendGroupsAsync(IReadOnlyList groupNames, string me { // Each task represents the list of tasks for each of the writes within a group List? tasks = null; - SerializedHubMessage? message = null; + HashSet? connections = null; foreach (var groupName in groupNames) { @@ -238,7 +238,26 @@ public override Task SendGroupsAsync(IReadOnlyList groupNames, string me var group = _groups[groupName]; if (group != null) { - DefaultHubLifetimeManager.SendToGroupConnections(methodName, args, group, null, null, ref tasks, ref message, cancellationToken); + foreach (var connection in group) + { + if (connections == null) + { + connections = new HashSet(); + } + connections.Add(connection.Key); + } + } + } + + if (connections != null) + { + foreach (var connectionId in connections) + { + if (tasks == null) + { + tasks = new List(); + } + tasks.Add(SendConnectionAsync(connectionId, methodName, args, cancellationToken)); } } diff --git a/src/SignalR/server/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs b/src/SignalR/server/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs index eeabbdbd5c88..96e0a0119ff3 100644 --- a/src/SignalR/server/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs +++ b/src/SignalR/server/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs @@ -255,4 +255,23 @@ public async Task SendUsersAsyncWillCancelWithToken() Assert.False(connection1.ConnectionAborted.IsCancellationRequested); } } + + [Fact] + public async Task ConnectionInMultipleGroups_ReceivesMessageOnlyOnce() + { + using (var client1 = new TestClient()) + { + var manager = CreateNewHubLifetimeManager(); + var connection1 = HubConnectionContextUtils.Create(client1.Connection); + await manager.OnConnectedAsync(connection1).DefaultTimeout(); + await manager.AddToGroupAsync(connection1.ConnectionId, "group1").DefaultTimeout(); + await manager.AddToGroupAsync(connection1.ConnectionId, "group2").DefaultTimeout(); + await manager.SendGroupsAsync(new List { "group1", "group2" }, "Hello", new object[] { "World" }).DefaultTimeout(); + var message = Assert.IsType(client1.TryRead()); + Assert.Equal("Hello", message.Target); + Assert.Single(message.Arguments); + Assert.Equal("World", (string)message.Arguments[0]); + Assert.Null(client1.TryRead()); + } + } } diff --git a/src/SignalR/server/StackExchangeRedis/src/Internal/RedisSubscriptionManager.cs b/src/SignalR/server/StackExchangeRedis/src/Internal/RedisSubscriptionManager.cs index 1f69a1406b95..bd4c558f7b2a 100644 --- a/src/SignalR/server/StackExchangeRedis/src/Internal/RedisSubscriptionManager.cs +++ b/src/SignalR/server/StackExchangeRedis/src/Internal/RedisSubscriptionManager.cs @@ -63,4 +63,14 @@ public async Task RemoveSubscriptionAsync(string id, HubConnectionContext connec _lock.Release(); } } + + public HubConnectionStore? GetStore(string id) + { + if (_subscriptions.TryGetValue(id, out var store)) + { + return store; + } + + return null; + } } diff --git a/src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs b/src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs index f1743fa5f2d6..3147daa18b83 100644 --- a/src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs +++ b/src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs @@ -254,21 +254,38 @@ public override Task SendConnectionsAsync(IReadOnlyList connectionIds, s } /// - public override Task SendGroupsAsync(IReadOnlyList groupNames, string methodName, object?[] args, CancellationToken cancellationToken = default) + public override async Task SendGroupsAsync(IReadOnlyList groupNames, string methodName, object?[] args, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(groupNames); - var publishTasks = new List(groupNames.Count); - var payload = _protocol.WriteInvocation(methodName, args); + HashSet? connections = null; foreach (var groupName in groupNames) { if (!string.IsNullOrEmpty(groupName)) { - publishTasks.Add(PublishAsync(_channels.Group(groupName), payload)); + var groupChannel = _channels.Group(groupName); + if (groupChannel != null) + { + var connectionStore = _groups.GetStore(groupChannel); + if (connectionStore != null) + { + foreach (var connection in connectionStore) + { + if (connections == null) + { + connections = new HashSet(); + } + connections.Add(connection.ConnectionId); + } + } + } } } - return Task.WhenAll(publishTasks); + if (connections != null) + { + await SendConnectionsAsync(connections.ToList(), methodName, args, cancellationToken); + } } /// diff --git a/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs b/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs index 3346446f4ec9..d4ba56c8919d 100644 --- a/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs +++ b/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs @@ -145,6 +145,30 @@ public async Task PatternGroupAndUser() } } + [Fact] + public async Task ConnectionInMultipleGroups_ReceivesMessageOnlyOnce() + { + var server = new TestRedisServer(); + + using (var client = new TestClient()) + { + var manager = CreateLifetimeManager(server); + var connection = HubConnectionContextUtils.Create(client.Connection); + + await manager.OnConnectedAsync(connection).DefaultTimeout(); + await manager.AddToGroupAsync(connection.ConnectionId, "group1").DefaultTimeout(); + await manager.AddToGroupAsync(connection.ConnectionId, "group2").DefaultTimeout(); + + await manager.SendGroupsAsync(new[] { "group1", "group2" }, "Hello", new object[] { "World" }).DefaultTimeout(); + + var message = Assert.IsType(await client.ReadAsync().DefaultTimeout()); + Assert.Equal("Hello", message.Target); + Assert.Single(message.Arguments); + Assert.Equal("World", (string)message.Arguments[0]); + Assert.Null(client.TryRead()); + } + } + public override TestRedisServer CreateBackplane() { return new TestRedisServer();