From 17e81a0bf11bd2ead2862026cae3bdb2f1d22373 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 17 Jun 2025 15:34:22 +0200 Subject: [PATCH 01/19] wip Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawConsumer.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index f34b367f..5de1bdeb 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -132,13 +132,11 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable private readonly Channel _chunksBuffer; private readonly ushort _initialCredits; - // _completeSubscription is used to notify the ProcessChunks task // that the subscription is completed and so it can start to process the chunks // this is needed because the socket starts to receive the chunks before the subscription_id is // assigned. - private readonly TaskCompletionSource _completeSubscription = new(); - + private readonly TaskCompletionSource _completeSubscription = new(TaskCreationOptions.RunContinuationsAsynchronously); protected sealed override string DumpEntityConfiguration() { var superStream = string.IsNullOrEmpty(_config.SuperStream) @@ -161,6 +159,8 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration()); Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier, null); // _chunksBuffer is a channel that is used to buffer the chunks + + _chunksBuffer = Channel.CreateBounded(new BoundedChannelOptions(_initialCredits) { AllowSynchronousContinuations = false, @@ -461,6 +461,7 @@ private void ProcessChunks() // need to wait the subscription is completed // else the _subscriberId could be incorrect _completeSubscription.Task.Wait(); + try { while (!Token.IsCancellationRequested && From dc853ae323a9f662a2a1258dae136a39f3b8d6c4 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 17 Jun 2025 16:34:25 +0200 Subject: [PATCH 02/19] wip Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawConsumer.cs | 5 ++--- global.json | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 5de1bdeb..9fbfed71 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -159,8 +159,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration()); Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier, null); // _chunksBuffer is a channel that is used to buffer the chunks - - + _chunksBuffer = Channel.CreateBounded(new BoundedChannelOptions(_initialCredits) { AllowSynchronousContinuations = false, @@ -461,7 +460,7 @@ private void ProcessChunks() // need to wait the subscription is completed // else the _subscriberId could be incorrect _completeSubscription.Task.Wait(); - + try { while (!Token.IsCancellationRequested && diff --git a/global.json b/global.json index 47a7fa60..af8d7f46 100644 --- a/global.json +++ b/global.json @@ -1,5 +1,6 @@ { "sdk": { - "allowPrerelease": false + "allowPrerelease": false, + "version": "9.0.202" } } From 3dccb3c3170599fb3a9cf6e05a11dc0487ba2f45 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 17 Jun 2025 16:44:49 +0200 Subject: [PATCH 03/19] pin version Signed-off-by: Gabriele Santomaggio --- .github/workflows/build-test.yaml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 6fccf392..f4a12a5f 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -8,6 +8,12 @@ jobs: name: build/test on windows-latest runs-on: windows-latest steps: + - name: Clone repository + uses: actions/checkout@v4 + - name: Setup .NET SDK + uses: actions/setup-dotnet@v4 + with: + global-json-file: global.json - uses: actions/checkout@v4 - uses: actions/cache@v4 with: @@ -35,6 +41,12 @@ jobs: name: build/test on ubuntu-latest runs-on: ubuntu-latest steps: + - name: Clone repository + uses: actions/checkout@v4 + - name: Setup .NET SDK + uses: actions/setup-dotnet@v4 + with: + global-json-file: global.json - uses: actions/checkout@v4 - uses: actions/setup-dotnet@v4 with: From ef91bd8a3f9eac38cde092d09250cf79f2522109 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 17 Jun 2025 22:13:37 +0200 Subject: [PATCH 04/19] refactor Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawConsumer.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 9fbfed71..0ab619ff 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -265,8 +265,8 @@ Message MessageFromSequence(ref ReadOnlySequence unCompressedData, ref int var slice = unCompressedData.Slice(compressOffset, 4); compressOffset += WireFormatting.ReadUInt32(ref slice, out var len); Debug.Assert(len > 0); - slice = unCompressedData.Slice(compressOffset, len); - Debug.Assert(slice.Length >= len); + var sliceMsg = unCompressedData.Slice(compressOffset, len); + Debug.Assert(sliceMsg.Length == len); compressOffset += (int)len; // Here we use the Message.From(ref ReadOnlySequence seq ..) method to parse the message @@ -274,7 +274,7 @@ Message MessageFromSequence(ref ReadOnlySequence unCompressedData, ref int // Since the ParseChunk is async and we cannot use the ref SequenceReader reader // See https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 for more details - var message = Message.From(ref slice, len); + var message = Message.From(ref sliceMsg, len); return message; } catch (Exception e) From 4f54f2e30607c61cb0a1a87aba657f3d848520c9 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 24 Jun 2025 15:23:11 +0200 Subject: [PATCH 05/19] Enable CRC32 by default add System.IO.Hashing dependecy Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/ClientExceptions.cs | 8 ----- RabbitMQ.Stream.Client/IConsumer.cs | 8 +++-- RabbitMQ.Stream.Client/ICrc32.cs | 15 ++++++++++ .../PublicAPI.Unshipped.txt | 12 ++++++-- .../RabbitMQ.Stream.Client.csproj | 1 + RabbitMQ.Stream.Client/RawConsumer.cs | 29 +++++++++++++++---- .../StreamCrc32.cs | 8 ++--- Tests/RawConsumerSystemTests.cs | 2 +- Tests/ReliableTests.cs | 5 +++- Tests/SuperStreamConsumerTests.cs | 2 -- Tests/Tests.csproj | 1 - docs/Documentation/ConsumerUsage.cs | 12 ++------ docs/Documentation/Documentation.csproj | 1 - 13 files changed, 66 insertions(+), 38 deletions(-) rename Tests/Crc32.cs => RabbitMQ.Stream.Client/StreamCrc32.cs (69%) diff --git a/RabbitMQ.Stream.Client/ClientExceptions.cs b/RabbitMQ.Stream.Client/ClientExceptions.cs index 71d47c6f..95e2db59 100644 --- a/RabbitMQ.Stream.Client/ClientExceptions.cs +++ b/RabbitMQ.Stream.Client/ClientExceptions.cs @@ -177,14 +177,6 @@ public UnknownCommandException(string s) } } - public class CrcException : Exception - { - public CrcException(string s) - : base(s) - { - } - } - public class TooManyConnectionsException : Exception { public TooManyConnectionsException(string s) diff --git a/RabbitMQ.Stream.Client/IConsumer.cs b/RabbitMQ.Stream.Client/IConsumer.cs index b44e48b2..0dd1ea50 100644 --- a/RabbitMQ.Stream.Client/IConsumer.cs +++ b/RabbitMQ.Stream.Client/IConsumer.cs @@ -73,8 +73,9 @@ public ushort InitialCredits // enables the check of the crc on the delivery. // the server will send the crc for each chunk and the client will check it. - // It is not enabled by default because it is could reduce the performance. - public ICrc32 Crc32 { get; set; } = null; + // It is enabled by default. You can disable it by setting it to null. + // It is recommended to keep it enabled. Disable it only for performance reasons. + public ICrc32 Crc32 { get; set; } = new StreamCrc32(); } public class ConsumerInfo : Info @@ -90,6 +91,7 @@ public ConsumerInfo(string stream, string reference, string identifier, List + /// The consumer will Skip the Chunk and continue processing the next message. + /// + SkipChunk, + + /// + /// The consumer will receive the message, but it will be marked as invalid and + /// the consumer will be closed. + /// + CloseConsumer + } + /// /// ICrc32 defines an interface for implementing crc32 hashing. /// Library users who wish to perform crc32 checks on data from RabbitMQ @@ -13,5 +27,6 @@ namespace RabbitMQ.Stream.Client public interface ICrc32 { byte[] Hash(byte[] data); + CrcFailureAction CrcFailureAction { get; set; } } } diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index e66563b2..1a730ed7 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -109,8 +109,9 @@ RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void RabbitMQ.Stream.Client.ConsumerInfo RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string identifier, System.Collections.Generic.List partitions) -> void RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string -RabbitMQ.Stream.Client.CrcException -RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void +RabbitMQ.Stream.Client.CrcFailureAction +RabbitMQ.Stream.Client.CrcFailureAction.CloseConsumer = 1 -> RabbitMQ.Stream.Client.CrcFailureAction +RabbitMQ.Stream.Client.CrcFailureAction.SkipChunk = 0 -> RabbitMQ.Stream.Client.CrcFailureAction RabbitMQ.Stream.Client.CreateConsumerException.CreateConsumerException(string s, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void RabbitMQ.Stream.Client.CreateException RabbitMQ.Stream.Client.CreateException.CreateException(string s) -> void @@ -175,6 +176,8 @@ RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void RabbitMQ.Stream.Client.ICrc32 +RabbitMQ.Stream.Client.ICrc32.CrcFailureAction.get -> RabbitMQ.Stream.Client.CrcFailureAction +RabbitMQ.Stream.Client.ICrc32.CrcFailureAction.set -> void RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[] RabbitMQ.Stream.Client.Info RabbitMQ.Stream.Client.Info.Identifier.get -> string @@ -322,6 +325,11 @@ RabbitMQ.Stream.Client.RouteQueryResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType +RabbitMQ.Stream.Client.StreamCrc32 +RabbitMQ.Stream.Client.StreamCrc32.CrcFailureAction.get -> RabbitMQ.Stream.Client.CrcFailureAction +RabbitMQ.Stream.Client.StreamCrc32.CrcFailureAction.set -> void +RabbitMQ.Stream.Client.StreamCrc32.Hash(byte[] data) -> byte[] +RabbitMQ.Stream.Client.StreamCrc32.StreamCrc32() -> void RabbitMQ.Stream.Client.StreamStats RabbitMQ.Stream.Client.StreamStats.CommittedChunkId() -> ulong RabbitMQ.Stream.Client.StreamStats.FirstOffset() -> ulong diff --git a/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj b/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj index 70ea8824..081a7edc 100644 --- a/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj +++ b/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj @@ -39,6 +39,7 @@ + diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 0ab619ff..48aa8da4 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -131,12 +131,16 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable private readonly RawConsumerConfig _config; private readonly Channel _chunksBuffer; + private readonly ushort _initialCredits; + // _completeSubscription is used to notify the ProcessChunks task // that the subscription is completed and so it can start to process the chunks // this is needed because the socket starts to receive the chunks before the subscription_id is // assigned. - private readonly TaskCompletionSource _completeSubscription = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _completeSubscription = + new(TaskCreationOptions.RunContinuationsAsynchronously); + protected sealed override string DumpEntityConfiguration() { var superStream = string.IsNullOrEmpty(_config.SuperStream) @@ -219,7 +223,6 @@ public async Task StoreOffset(ulong offset) /// MaybeLockDispatch is an optimization to avoid to lock the dispatch /// when the consumer is not single active consumer /// - private async Task MaybeLockDispatch() { if (_config.IsSingleActiveConsumer) @@ -606,9 +609,25 @@ private async Task Init() DumpEntityConfiguration(), chunkConsumed); - throw new CrcException( - $"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " + - $"Chunk Consumed {chunkConsumed}"); + // we can skip the chunk since the CRC does not match + switch (_config.Crc32.CrcFailureAction) + { + case CrcFailureAction.SkipChunk: + Logger?.LogWarning( + "CRC32 fail. The chunk will be skipped by policy. {EntityInfo}, Chunk Consumed {ChunkConsumed}", + DumpEntityConfiguration(), chunkConsumed); + return; + + case CrcFailureAction.CloseConsumer: + Logger?.LogError( + "CRC32 fail. The consumer will be closed by policy. {EntityInfo}, Chunk Consumed {ChunkConsumed}", + DumpEntityConfiguration(), chunkConsumed); + // in this case we close the consumer + await Close().ConfigureAwait(false); + break; + default: + throw new ArgumentOutOfRangeException(); + } } } diff --git a/Tests/Crc32.cs b/RabbitMQ.Stream.Client/StreamCrc32.cs similarity index 69% rename from Tests/Crc32.cs rename to RabbitMQ.Stream.Client/StreamCrc32.cs index 7d7b3df3..86272495 100644 --- a/Tests/Crc32.cs +++ b/RabbitMQ.Stream.Client/StreamCrc32.cs @@ -2,14 +2,14 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using RabbitMQ.Stream.Client; +namespace RabbitMQ.Stream.Client; -namespace Tests; - -public class Crc32 : ICrc32 +public class StreamCrc32 : ICrc32 { public byte[] Hash(byte[] data) { return System.IO.Hashing.Crc32.Hash(data); } + + public CrcFailureAction CrcFailureAction { get; set; } = CrcFailureAction.SkipChunk; } diff --git a/Tests/RawConsumerSystemTests.cs b/Tests/RawConsumerSystemTests.cs index 7328ea6e..b208feec 100644 --- a/Tests/RawConsumerSystemTests.cs +++ b/Tests/RawConsumerSystemTests.cs @@ -20,7 +20,7 @@ namespace Tests { public class ConsumerSystemTests { - private readonly ICrc32 _crc32 = new Crc32(); + private readonly ICrc32 _crc32 = new StreamCrc32(); private readonly ITestOutputHelper testOutputHelper; public ConsumerSystemTests(ITestOutputHelper testOutputHelper) diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index c44acd9c..8a65945d 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -18,7 +18,10 @@ namespace Tests; public class ReliableTests { - private readonly ICrc32 _crc32 = new Crc32(); + private readonly ICrc32 _crc32 = new StreamCrc32() + { + CrcFailureAction = CrcFailureAction.CloseConsumer + }; private readonly ITestOutputHelper _testOutputHelper; public ReliableTests(ITestOutputHelper testOutputHelper) diff --git a/Tests/SuperStreamConsumerTests.cs b/Tests/SuperStreamConsumerTests.cs index ac45fdbc..d1d1dc92 100644 --- a/Tests/SuperStreamConsumerTests.cs +++ b/Tests/SuperStreamConsumerTests.cs @@ -19,7 +19,6 @@ namespace Tests; public class SuperStreamConsumerTests { - private readonly ICrc32 _crc32 = new Crc32(); private readonly ITestOutputHelper _testOutputHelper; public SuperStreamConsumerTests(ITestOutputHelper testOutputHelper) @@ -91,7 +90,6 @@ public async Task NumberOfMessagesConsumedShouldBeEqualsToPublished() var consumer = await system.CreateSuperStreamConsumer( new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) { - Crc32 = _crc32, ClientProvidedName = clientProvidedName, Identifier = "super_stream_consumer_24680", OffsetSpec = diff --git a/Tests/Tests.csproj b/Tests/Tests.csproj index 5db4744b..4665a6d9 100644 --- a/Tests/Tests.csproj +++ b/Tests/Tests.csproj @@ -15,7 +15,6 @@ - diff --git a/docs/Documentation/ConsumerUsage.cs b/docs/Documentation/ConsumerUsage.cs index fcc9abc1..2d470003 100644 --- a/docs/Documentation/ConsumerUsage.cs +++ b/docs/Documentation/ConsumerUsage.cs @@ -128,15 +128,7 @@ public static async Task CreateConsumerSingleUpdateListener() await streamSystem.Close().ConfigureAwait(false); } - // tag::consumer-creation-crc[] - private class UserCrc32 : ICrc32 // <1> - { - public byte[] Hash(byte[] data) - { - // Here we use the System.IO.Hashing.Crc32 implementation - return System.IO.Hashing.Crc32.Hash(data); - } - } + public static async Task CreateConsumerWithCrc() { @@ -148,7 +140,7 @@ public static async Task CreateConsumerWithCrc() streamSystem, "my-stream") { - Crc32 = new UserCrc32(), // <2> + Crc32 = new StreamCrc32(), // <2> OffsetSpec = new OffsetTypeTimestamp(), // end::consumer-creation-crc[] MessageHandler = async (stream, consumer, context, message) => // <4> diff --git a/docs/Documentation/Documentation.csproj b/docs/Documentation/Documentation.csproj index 2372da43..f2b8384c 100644 --- a/docs/Documentation/Documentation.csproj +++ b/docs/Documentation/Documentation.csproj @@ -20,7 +20,6 @@ - From 52b626fcfac8be314a45bf6579600cea8a97ac1c Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 14:31:51 +0200 Subject: [PATCH 06/19] Add custom function in case of the crc32 fails and the user want to add a custom code Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/ICrc32.cs | 17 +-- .../PublicAPI.Unshipped.txt | 14 +- RabbitMQ.Stream.Client/RawConsumer.cs | 47 +++---- .../Reliable/ConsumerFactory.cs | 9 ++ .../Reliable/ProducerFactory.cs | 9 ++ RabbitMQ.Stream.Client/StreamCrc32.cs | 4 +- Tests/Crc32Tests.cs | 130 ++++++++++++++++++ Tests/ReliableTests.cs | 8 +- docs/Documentation/ConsumerUsage.cs | 5 +- 9 files changed, 198 insertions(+), 45 deletions(-) create mode 100644 Tests/Crc32Tests.cs diff --git a/RabbitMQ.Stream.Client/ICrc32.cs b/RabbitMQ.Stream.Client/ICrc32.cs index b991ec8f..770dfccd 100644 --- a/RabbitMQ.Stream.Client/ICrc32.cs +++ b/RabbitMQ.Stream.Client/ICrc32.cs @@ -2,20 +2,21 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +using System; + namespace RabbitMQ.Stream.Client { - public enum CrcFailureAction + public enum ChunkAction { /// - /// The consumer will Skip the Chunk and continue processing the next message. + /// The consumer will TryToProcess the Chunk. /// - SkipChunk, - + TryToProcess, /// - /// The consumer will receive the message, but it will be marked as invalid and - /// the consumer will be closed. + /// The consumer will Skip the Chunk and continue processing the next message. /// - CloseConsumer + Skip + } /// @@ -27,6 +28,6 @@ public enum CrcFailureAction public interface ICrc32 { byte[] Hash(byte[] data); - CrcFailureAction CrcFailureAction { get; set; } + Func FailAction { get; set; } } } diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 1a730ed7..a89ca90f 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -35,6 +35,9 @@ RabbitMQ.Stream.Client.BindingsSuperStreamSpec.BindingsSuperStreamSpec(string Na RabbitMQ.Stream.Client.Chunk.Crc.get -> uint RabbitMQ.Stream.Client.Chunk.Data.get -> System.ReadOnlyMemory RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte +RabbitMQ.Stream.Client.ChunkAction +RabbitMQ.Stream.Client.ChunkAction.Skip = 1 -> RabbitMQ.Stream.Client.ChunkAction +RabbitMQ.Stream.Client.ChunkAction.TryToProcess = 0 -> RabbitMQ.Stream.Client.ChunkAction RabbitMQ.Stream.Client.Client.ClientId.get -> string RabbitMQ.Stream.Client.Client.ClientId.init -> void RabbitMQ.Stream.Client.Client.Consumers.get -> System.Collections.Generic.IDictionary @@ -109,9 +112,6 @@ RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void RabbitMQ.Stream.Client.ConsumerInfo RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string identifier, System.Collections.Generic.List partitions) -> void RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string -RabbitMQ.Stream.Client.CrcFailureAction -RabbitMQ.Stream.Client.CrcFailureAction.CloseConsumer = 1 -> RabbitMQ.Stream.Client.CrcFailureAction -RabbitMQ.Stream.Client.CrcFailureAction.SkipChunk = 0 -> RabbitMQ.Stream.Client.CrcFailureAction RabbitMQ.Stream.Client.CreateConsumerException.CreateConsumerException(string s, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void RabbitMQ.Stream.Client.CreateException RabbitMQ.Stream.Client.CreateException.CreateException(string s) -> void @@ -176,8 +176,8 @@ RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void RabbitMQ.Stream.Client.ICrc32 -RabbitMQ.Stream.Client.ICrc32.CrcFailureAction.get -> RabbitMQ.Stream.Client.CrcFailureAction -RabbitMQ.Stream.Client.ICrc32.CrcFailureAction.set -> void +RabbitMQ.Stream.Client.ICrc32.FailAction.get -> System.Func +RabbitMQ.Stream.Client.ICrc32.FailAction.set -> void RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[] RabbitMQ.Stream.Client.Info RabbitMQ.Stream.Client.Info.Identifier.get -> string @@ -326,8 +326,8 @@ RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.StreamCrc32 -RabbitMQ.Stream.Client.StreamCrc32.CrcFailureAction.get -> RabbitMQ.Stream.Client.CrcFailureAction -RabbitMQ.Stream.Client.StreamCrc32.CrcFailureAction.set -> void +RabbitMQ.Stream.Client.StreamCrc32.FailAction.get -> System.Func +RabbitMQ.Stream.Client.StreamCrc32.FailAction.set -> void RabbitMQ.Stream.Client.StreamCrc32.Hash(byte[] data) -> byte[] RabbitMQ.Stream.Client.StreamCrc32.StreamCrc32() -> void RabbitMQ.Stream.Client.StreamStats diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 48aa8da4..cbdf72a0 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -130,7 +130,7 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable { private readonly RawConsumerConfig _config; - private readonly Channel _chunksBuffer; + private readonly Channel<(Chunk, ChunkAction)> _chunksBuffer; private readonly ushort _initialCredits; @@ -164,7 +164,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier, null); // _chunksBuffer is a channel that is used to buffer the chunks - _chunksBuffer = Channel.CreateBounded(new BoundedChannelOptions(_initialCredits) + _chunksBuffer = Channel.CreateBounded<(Chunk, ChunkAction)>(new BoundedChannelOptions(_initialCredits) { AllowSynchronousContinuations = false, SingleReader = true, @@ -469,10 +469,12 @@ private void ProcessChunks() while (!Token.IsCancellationRequested && await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // { - while (_chunksBuffer.Reader.TryRead(out var chunk)) + while (_chunksBuffer.Reader.TryRead(out var chunkWithAction)) { // We send the credit to the server to allow the server to send more messages // we request the credit before process the check to keep the network busy + + var (chunk, action) = chunkWithAction; try { if (Token.IsCancellationRequested) @@ -501,8 +503,18 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // // and close the task if (Token.IsCancellationRequested) break; - - await ParseChunk(chunk).ConfigureAwait(false); + switch (action) + { + case ChunkAction.Skip: + Logger?.LogDebug( + "The chunk {ChunkId} will be skipped for {EntityInfo}", + chunk.ChunkId, DumpEntityConfiguration()); + continue; // skip the chunk + case ChunkAction.TryToProcess: + // continue to process the chunk + await ParseChunk(chunk).ConfigureAwait(false); + break; + } } } @@ -596,6 +608,8 @@ private async Task Init() return; } + var skipChunk = ChunkAction.TryToProcess; + if (_config.Crc32 is not null) { var crcCalculated = BitConverter.ToUInt32( @@ -609,29 +623,16 @@ private async Task Init() DumpEntityConfiguration(), chunkConsumed); - // we can skip the chunk since the CRC does not match - switch (_config.Crc32.CrcFailureAction) + if (_config.Crc32.FailAction != null) { - case CrcFailureAction.SkipChunk: - Logger?.LogWarning( - "CRC32 fail. The chunk will be skipped by policy. {EntityInfo}, Chunk Consumed {ChunkConsumed}", - DumpEntityConfiguration(), chunkConsumed); - return; - - case CrcFailureAction.CloseConsumer: - Logger?.LogError( - "CRC32 fail. The consumer will be closed by policy. {EntityInfo}, Chunk Consumed {ChunkConsumed}", - DumpEntityConfiguration(), chunkConsumed); - // in this case we close the consumer - await Close().ConfigureAwait(false); - break; - default: - throw new ArgumentOutOfRangeException(); + // if the user has set the FailAction, we call it + // to allow the user to handle the chunk action + skipChunk = _config.Crc32.FailAction(this); } } } - await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false); + await _chunksBuffer.Writer.WriteAsync((deliver.Chunk, skipChunk), Token).ConfigureAwait(false); } catch (OperationCanceledException) { diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index bd9950d1..b73095a4 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -61,7 +61,11 @@ private async Task StandardConsumer(bool boot) ConnectionClosedHandler = async (closeReason) => { if (IsClosedNormally(closeReason)) + { + UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser, + [_consumerConfig.Stream]); return; + } try { @@ -153,7 +157,12 @@ private async Task SuperConsumer(bool boot) ConnectionClosedHandler = async (closeReason, partitionStream) => { if (IsClosedNormally(closeReason)) + { + UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser, + [partitionStream]); return; + } + await RandomWait().ConfigureAwait(false); try { diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index 2d173326..0e3c7ae8 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -50,7 +50,12 @@ private async Task SuperStreamProducer(bool boot) { await RandomWait().ConfigureAwait(false); if (IsClosedNormally(closeReason)) + { + UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser, + [partitionStream]); return; + } + try { var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; @@ -121,7 +126,11 @@ private async Task StandardProducer() { await RandomWait().ConfigureAwait(false); if (IsClosedNormally()) + { + UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser, + [_producerConfig.Stream]); return; + } await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream, ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false); diff --git a/RabbitMQ.Stream.Client/StreamCrc32.cs b/RabbitMQ.Stream.Client/StreamCrc32.cs index 86272495..4c0d4727 100644 --- a/RabbitMQ.Stream.Client/StreamCrc32.cs +++ b/RabbitMQ.Stream.Client/StreamCrc32.cs @@ -2,6 +2,8 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +using System; + namespace RabbitMQ.Stream.Client; public class StreamCrc32 : ICrc32 @@ -11,5 +13,5 @@ public byte[] Hash(byte[] data) return System.IO.Hashing.Crc32.Hash(data); } - public CrcFailureAction CrcFailureAction { get; set; } = CrcFailureAction.SkipChunk; + public Func FailAction { get; set; } } diff --git a/Tests/Crc32Tests.cs b/Tests/Crc32Tests.cs new file mode 100644 index 00000000..a3c94545 --- /dev/null +++ b/Tests/Crc32Tests.cs @@ -0,0 +1,130 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +using System; +using System.Threading.Tasks; +using RabbitMQ.Stream.Client; +using RabbitMQ.Stream.Client.Reliable; +using Xunit; +using Xunit.Abstractions; + +namespace Tests; + +public class FirstCrc32IsWrong : ICrc32 +{ + private int _callCount = 0; + + public byte[] Hash(byte[] data) + { + _callCount++; + return _callCount == 1 + ? + // Return a wrong hash on the first call to simulate a CRC32 failure + // second call will return the correct hash + [0x00, 0x00, 0x00, 0x04] + : System.IO.Hashing.Crc32.Hash(data); + } + + public Func FailAction { get; set; } +} + +public class Crc32Tests(ITestOutputHelper testOutputHelper) +{ + /// + /// Tests the Crc32 functionality of the consumer. + /// In this case, the first Crc32 is wrong, so the consumer should skip the chunk. + /// So the consumer should receive the second chunk. + /// + [Fact] + public async Task Crc32ShouldSkipChunk() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var completionSource = new TaskCompletionSource(); + var consumerConfig = new ConsumerConfig(system, stream) + { + Crc32 = new FirstCrc32IsWrong() { FailAction = (_) => ChunkAction.Skip }, + InitialCredits = 1, + MessageHandler = (_, _, messageContext, _) => + { + completionSource.TrySetResult(messageContext); + return Task.CompletedTask; + } + }; + var consumer = await Consumer.Create(consumerConfig); + await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); + await SystemUtils.WaitAsync(); + await SystemUtils.PublishMessages(system, stream, 5, "2", testOutputHelper); + var messageContext = await completionSource.Task; + Assert.True(messageContext.Offset == 3); + Assert.True(1 != messageContext.ChunkId); + Assert.True(5 == messageContext.ChunkMessagesCount); + Assert.True(consumer.IsOpen()); + await consumer.Close(); + await SystemUtils.CleanUpStreamSystem(system, stream); + } + + /// + /// Tests the Crc32 functionality of the consumer. + /// Simulate a CRC32 failure on the first chunk, + /// and add a custom action to close the consumer. + /// the consumer should be closed and the chunk should be skipped. + /// + [Fact] + public async Task Crc32ShouldCloseTheConsumer() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var consumerConfig = new ConsumerConfig(system, stream) + { + Crc32 = new FirstCrc32IsWrong() + { + FailAction = consumer => + { + consumer.Close(); + return ChunkAction.Skip; // Skip the chunk and close the consumer + } + }, + MessageHandler = (_, _, _, _) => Task.CompletedTask + }; + + var consumer = await Consumer.Create(consumerConfig); + await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); + await SystemUtils.WaitAsync(TimeSpan.FromMilliseconds(500)); + + Assert.False(consumer.IsOpen()); + await SystemUtils.CleanUpStreamSystem(system, stream); + } + + /// + /// Here we test an edge case where Crc32 is wrong, + /// but the consumer should still process the chunk. + /// by given the FailAction as TryToProcess. + /// In real life when a a CRC32 is wrong the consumer can't process the chunk, + /// this is to give more flexibility to the user. + /// + [Fact] + public async Task Crc32ShouldParseTheChunk() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var completionSource = new TaskCompletionSource(); + var consumerConfig = new ConsumerConfig(system, stream) + { + Crc32 = new FirstCrc32IsWrong() { FailAction = (_) => ChunkAction.TryToProcess }, + MessageHandler = (_, _, messageContext, _) => + { + completionSource.TrySetResult(messageContext); + return Task.CompletedTask; + } + }; + var consumer = await Consumer.Create(consumerConfig); + await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); + await SystemUtils.WaitAsync(); + var messageContext = await completionSource.Task; + Assert.True(messageContext.Offset == 0); + Assert.True(0 == messageContext.ChunkId); + Assert.True(3 == messageContext.ChunkMessagesCount); + Assert.True(consumer.IsOpen()); + await consumer.Close(); + await SystemUtils.CleanUpStreamSystem(system, stream); + } +} diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index 8a65945d..1d4eace3 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -18,10 +18,7 @@ namespace Tests; public class ReliableTests { - private readonly ICrc32 _crc32 = new StreamCrc32() - { - CrcFailureAction = CrcFailureAction.CloseConsumer - }; + private readonly ICrc32 _crc32 = new StreamCrc32() { }; private readonly ITestOutputHelper _testOutputHelper; public ReliableTests(ITestOutputHelper testOutputHelper) @@ -51,7 +48,8 @@ public async Task MessageWithoutConfirmationRaiseTimeout() ); confirmationPipe.Start(); confirmationPipe.AddUnConfirmedMessage(1, new Message(Encoding.UTF8.GetBytes($"hello"))); - confirmationPipe.AddUnConfirmedMessage(2, new List() { new Message(Encoding.UTF8.GetBytes($"hello")) }); + confirmationPipe.AddUnConfirmedMessage(2, + new List() { new Message(Encoding.UTF8.GetBytes($"hello")) }); new Utils(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); Assert.Equal(2, await confirmationTask.Task); Assert.Equal(2, l.Count); diff --git a/docs/Documentation/ConsumerUsage.cs b/docs/Documentation/ConsumerUsage.cs index 2d470003..937f419a 100644 --- a/docs/Documentation/ConsumerUsage.cs +++ b/docs/Documentation/ConsumerUsage.cs @@ -140,7 +140,10 @@ public static async Task CreateConsumerWithCrc() streamSystem, "my-stream") { - Crc32 = new StreamCrc32(), // <2> + Crc32 = new StreamCrc32() + { + FailAction = () => ChunkAction.Skip // <1> + }, // <2> OffsetSpec = new OffsetTypeTimestamp(), // end::consumer-creation-crc[] MessageHandler = async (stream, consumer, context, message) => // <4> From adcb62fd52d16c410b9d0c4c971f378f5170293a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 14:38:41 +0200 Subject: [PATCH 07/19] Add custom function in case of the crc32 fails and the user want to add a custom code Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/ICrc32.cs | 9 ++++++++- docs/Documentation/ConsumerUsage.cs | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/RabbitMQ.Stream.Client/ICrc32.cs b/RabbitMQ.Stream.Client/ICrc32.cs index 770dfccd..022c9c66 100644 --- a/RabbitMQ.Stream.Client/ICrc32.cs +++ b/RabbitMQ.Stream.Client/ICrc32.cs @@ -12,11 +12,11 @@ public enum ChunkAction /// The consumer will TryToProcess the Chunk. /// TryToProcess, + /// /// The consumer will Skip the Chunk and continue processing the next message. /// Skip - } /// @@ -28,6 +28,13 @@ public enum ChunkAction public interface ICrc32 { byte[] Hash(byte[] data); + + /// + /// FailAction is called when the Crc32 check fails. + /// The user can assign a function that returns a . + /// It is possible to add custom logic to handle the failure, such as logging. + /// The code here should be safe + /// Func FailAction { get; set; } } } diff --git a/docs/Documentation/ConsumerUsage.cs b/docs/Documentation/ConsumerUsage.cs index 937f419a..3fe248f3 100644 --- a/docs/Documentation/ConsumerUsage.cs +++ b/docs/Documentation/ConsumerUsage.cs @@ -142,7 +142,7 @@ public static async Task CreateConsumerWithCrc() { Crc32 = new StreamCrc32() { - FailAction = () => ChunkAction.Skip // <1> + FailAction = (consumerInstance) => ChunkAction.Skip // <1> }, // <2> OffsetSpec = new OffsetTypeTimestamp(), // end::consumer-creation-crc[] From 3bf528e3d1d1eab32650fb38c77c9f8664eed13d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 14:50:56 +0200 Subject: [PATCH 08/19] Add custom function in case of the crc32 fails and the user want to add a custom code Signed-off-by: Gabriele Santomaggio --- Tests/Crc32Tests.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/Tests/Crc32Tests.cs b/Tests/Crc32Tests.cs index a3c94545..72010b8f 100644 --- a/Tests/Crc32Tests.cs +++ b/Tests/Crc32Tests.cs @@ -56,9 +56,7 @@ public async Task Crc32ShouldSkipChunk() await SystemUtils.WaitAsync(); await SystemUtils.PublishMessages(system, stream, 5, "2", testOutputHelper); var messageContext = await completionSource.Task; - Assert.True(messageContext.Offset == 3); Assert.True(1 != messageContext.ChunkId); - Assert.True(5 == messageContext.ChunkMessagesCount); Assert.True(consumer.IsOpen()); await consumer.Close(); await SystemUtils.CleanUpStreamSystem(system, stream); From 2364e2f656e77845a24ed18fd1f57e9c7e3dd227 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 15:27:09 +0200 Subject: [PATCH 09/19] Add custom function in case of the crc32 fails and the user want to add a custom code Signed-off-by: Gabriele Santomaggio --- Tests/Crc32Tests.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/Tests/Crc32Tests.cs b/Tests/Crc32Tests.cs index 72010b8f..603e18a2 100644 --- a/Tests/Crc32Tests.cs +++ b/Tests/Crc32Tests.cs @@ -118,9 +118,7 @@ public async Task Crc32ShouldParseTheChunk() await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); await SystemUtils.WaitAsync(); var messageContext = await completionSource.Task; - Assert.True(messageContext.Offset == 0); Assert.True(0 == messageContext.ChunkId); - Assert.True(3 == messageContext.ChunkMessagesCount); Assert.True(consumer.IsOpen()); await consumer.Close(); await SystemUtils.CleanUpStreamSystem(system, stream); From 81cf2c6a8fb9de8261ea8e84661b35bc913f04f3 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 15:46:35 +0200 Subject: [PATCH 10/19] Add custom function in case of the crc32 fails and the user want to add a custom code Signed-off-by: Gabriele Santomaggio --- Tests/Crc32Tests.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Tests/Crc32Tests.cs b/Tests/Crc32Tests.cs index 603e18a2..4eb6ee65 100644 --- a/Tests/Crc32Tests.cs +++ b/Tests/Crc32Tests.cs @@ -97,7 +97,7 @@ public async Task Crc32ShouldCloseTheConsumer() /// Here we test an edge case where Crc32 is wrong, /// but the consumer should still process the chunk. /// by given the FailAction as TryToProcess. - /// In real life when a a CRC32 is wrong the consumer can't process the chunk, + /// In real life when a CRC32 is wrong the consumer can't process the chunk, /// this is to give more flexibility to the user. /// [Fact] @@ -118,7 +118,11 @@ public async Task Crc32ShouldParseTheChunk() await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); await SystemUtils.WaitAsync(); var messageContext = await completionSource.Task; - Assert.True(0 == messageContext.ChunkId); + if (messageContext.ChunkId != 0) + { + Assert.Fail($"Expected the first chunk to be processed, but it was not.ChunkId {messageContext.ChunkId}"); + } + Assert.True(consumer.IsOpen()); await consumer.Close(); await SystemUtils.CleanUpStreamSystem(system, stream); From 5f633b79daffcb7f7b4a80ee87b0633b9d737e60 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 15:56:12 +0200 Subject: [PATCH 11/19] Add custom function in case of the crc32 fails and the user want to add a custom code Signed-off-by: Gabriele Santomaggio --- Tests/Crc32Tests.cs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Tests/Crc32Tests.cs b/Tests/Crc32Tests.cs index 4eb6ee65..1b0b2e37 100644 --- a/Tests/Crc32Tests.cs +++ b/Tests/Crc32Tests.cs @@ -53,10 +53,13 @@ public async Task Crc32ShouldSkipChunk() }; var consumer = await Consumer.Create(consumerConfig); await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); - await SystemUtils.WaitAsync(); + await SystemUtils.WaitAsync(TimeSpan.FromMilliseconds(700)); await SystemUtils.PublishMessages(system, stream, 5, "2", testOutputHelper); var messageContext = await completionSource.Task; - Assert.True(1 != messageContext.ChunkId); + if (messageContext.ChunkId == 0) + { + Assert.Fail($"Expected the second chunk to be processed, but it was not.ChunkId {messageContext.ChunkId}"); + } Assert.True(consumer.IsOpen()); await consumer.Close(); await SystemUtils.CleanUpStreamSystem(system, stream); From 7767f9de5ae7a67de2be78db886d66a64cbcfdae Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 16:01:46 +0200 Subject: [PATCH 12/19] Add custom function in case of the crc32 fails and the user want to add a custom code Signed-off-by: Gabriele Santomaggio --- Tests/Crc32Tests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/Tests/Crc32Tests.cs b/Tests/Crc32Tests.cs index 1b0b2e37..60b66fd2 100644 --- a/Tests/Crc32Tests.cs +++ b/Tests/Crc32Tests.cs @@ -60,6 +60,7 @@ public async Task Crc32ShouldSkipChunk() { Assert.Fail($"Expected the second chunk to be processed, but it was not.ChunkId {messageContext.ChunkId}"); } + Assert.True(consumer.IsOpen()); await consumer.Close(); await SystemUtils.CleanUpStreamSystem(system, stream); From 172ab72790db20dfdefc4edd6407aaa0ffce2a90 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 16:54:46 +0200 Subject: [PATCH 13/19] Add custom function in case of the crc32 fails and the user want to add a custom code Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawConsumer.cs | 25 ++++++++++++--------- Tests/Crc32Tests.cs | 31 +++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index cbdf72a0..76f4f7d7 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -506,14 +506,18 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // switch (action) { case ChunkAction.Skip: - Logger?.LogDebug( + // the chunk will be skipped due of CRC32 fail + Logger?.LogWarning( "The chunk {ChunkId} will be skipped for {EntityInfo}", chunk.ChunkId, DumpEntityConfiguration()); continue; // skip the chunk case ChunkAction.TryToProcess: - // continue to process the chunk + // That's what happens most of the time, and this is the default action + // Process the chunk await ParseChunk(chunk).ConfigureAwait(false); break; + default: + throw new ArgumentOutOfRangeException(); } } } @@ -608,7 +612,7 @@ private async Task Init() return; } - var skipChunk = ChunkAction.TryToProcess; + var chunkAction = ChunkAction.TryToProcess; if (_config.Crc32 is not null) { @@ -623,16 +627,17 @@ private async Task Init() DumpEntityConfiguration(), chunkConsumed); - if (_config.Crc32.FailAction != null) - { - // if the user has set the FailAction, we call it - // to allow the user to handle the chunk action - skipChunk = _config.Crc32.FailAction(this); - } + // if the user has set the FailAction, we call it + // to allow the user to handle the chunk action + // if the FailAction is not set, we skip the chunk + chunkAction = _config.Crc32.FailAction?.Invoke(this) ?? ChunkAction.Skip; } } - await _chunksBuffer.Writer.WriteAsync((deliver.Chunk, skipChunk), Token).ConfigureAwait(false); + // if the chunkAction is passed to the _chunksBuffer because the ProcessChunks task + // asks for the credits. If we skip here no more credits will be requested + await _chunksBuffer.Writer.WriteAsync((deliver.Chunk, chunkAction), Token) + .ConfigureAwait(false); } catch (OperationCanceledException) { diff --git a/Tests/Crc32Tests.cs b/Tests/Crc32Tests.cs index 60b66fd2..08755f58 100644 --- a/Tests/Crc32Tests.cs +++ b/Tests/Crc32Tests.cs @@ -66,6 +66,37 @@ public async Task Crc32ShouldSkipChunk() await SystemUtils.CleanUpStreamSystem(system, stream); } + [Fact] + public async Task Crc32ShouldSkipChunkWithTheDefaultBehavior() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var completionSource = new TaskCompletionSource(); + var consumerConfig = new ConsumerConfig(system, stream) + { + // FailAction is not set, so it will use the default behavior which is to skip the chunk + Crc32 = new FirstCrc32IsWrong() { }, + InitialCredits = 1, + MessageHandler = (_, _, messageContext, _) => + { + completionSource.TrySetResult(messageContext); + return Task.CompletedTask; + } + }; + var consumer = await Consumer.Create(consumerConfig); + await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); + await SystemUtils.WaitAsync(TimeSpan.FromMilliseconds(700)); + await SystemUtils.PublishMessages(system, stream, 5, "2", testOutputHelper); + var messageContext = await completionSource.Task; + if (messageContext.ChunkId == 0) + { + Assert.Fail($"Expected the second chunk to be processed, but it was not.ChunkId {messageContext.ChunkId}"); + } + + Assert.True(consumer.IsOpen()); + await consumer.Close(); + await SystemUtils.CleanUpStreamSystem(system, stream); + } + /// /// Tests the Crc32 functionality of the consumer. /// Simulate a CRC32 failure on the first chunk, From 2e040f62341a7c5363909ad4a7de6164f659fc6a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 17:11:25 +0200 Subject: [PATCH 14/19] Add custom function in case of the crc32 fails and the user want to add a custom code Signed-off-by: Gabriele Santomaggio --- Tests/Crc32Tests.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Tests/Crc32Tests.cs b/Tests/Crc32Tests.cs index 08755f58..d2fa4575 100644 --- a/Tests/Crc32Tests.cs +++ b/Tests/Crc32Tests.cs @@ -123,8 +123,7 @@ public async Task Crc32ShouldCloseTheConsumer() var consumer = await Consumer.Create(consumerConfig); await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); await SystemUtils.WaitAsync(TimeSpan.FromMilliseconds(500)); - - Assert.False(consumer.IsOpen()); + await SystemUtils.WaitUntilAsync(() => !consumer.IsOpen()); await SystemUtils.CleanUpStreamSystem(system, stream); } From a6c6eec906e1848df1556365457cde203136c216 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 18:01:30 +0200 Subject: [PATCH 15/19] Fix Reliable client project Add documentation Signed-off-by: Gabriele Santomaggio --- Directory.Packages.props | 1 + RabbitMQ.Stream.Client/StreamCrc32.cs | 2 +- docs/Documentation/ConsumerUsage.cs | 6 +-- docs/ReliableClient/ReliableClient.csproj | 11 ++--- docs/asciidoc/api.adoc | 53 +++++++++++++++-------- 5 files changed, 46 insertions(+), 27 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index adf37d6d..e4e1a379 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -5,6 +5,7 @@ + diff --git a/RabbitMQ.Stream.Client/StreamCrc32.cs b/RabbitMQ.Stream.Client/StreamCrc32.cs index 4c0d4727..ffbdc602 100644 --- a/RabbitMQ.Stream.Client/StreamCrc32.cs +++ b/RabbitMQ.Stream.Client/StreamCrc32.cs @@ -13,5 +13,5 @@ public byte[] Hash(byte[] data) return System.IO.Hashing.Crc32.Hash(data); } - public Func FailAction { get; set; } + public Func FailAction { get; set; } = null; } diff --git a/docs/Documentation/ConsumerUsage.cs b/docs/Documentation/ConsumerUsage.cs index 3fe248f3..913da05c 100644 --- a/docs/Documentation/ConsumerUsage.cs +++ b/docs/Documentation/ConsumerUsage.cs @@ -140,10 +140,10 @@ public static async Task CreateConsumerWithCrc() streamSystem, "my-stream") { - Crc32 = new StreamCrc32() + Crc32 = new StreamCrc32() // <1> { - FailAction = (consumerInstance) => ChunkAction.Skip // <1> - }, // <2> + FailAction = (consumerInstance) => ChunkAction.Skip // <2> + }, OffsetSpec = new OffsetTypeTimestamp(), // end::consumer-creation-crc[] MessageHandler = async (stream, consumer, context, message) => // <4> diff --git a/docs/ReliableClient/ReliableClient.csproj b/docs/ReliableClient/ReliableClient.csproj index 27b2351e..da4b0615 100644 --- a/docs/ReliableClient/ReliableClient.csproj +++ b/docs/ReliableClient/ReliableClient.csproj @@ -7,16 +7,17 @@ net8.0;net9.0 - - - - + + - + + + + Always diff --git a/docs/asciidoc/api.adoc b/docs/asciidoc/api.adoc index ddf6d7f4..dd4e3b53 100644 --- a/docs/asciidoc/api.adoc +++ b/docs/asciidoc/api.adoc @@ -5,7 +5,7 @@ ==== Overview This section describes the API to connect to the RabbitMQ Stream Plugin, publish messages, and consume messages. -There are 3 main interfaces: +There are three main interfaces: * `RabbitMQ.Stream.Client` for connecting to a node and optionally managing streams. * `RabbitMQ.Stream.Client.Reliable.Producer` to publish messages. @@ -141,8 +141,9 @@ The following table sums up the main settings to create an `StreamSystem` using [[connection-pool]] ===== Connection pool + Introduced on version 1.8.0. -With the connection pool you can define how many producers and consumers can be created on a single connection and the `ConnectionCloseConfig` +With the connection pool you can define how many producers and consumers can be created on a single connection and the `ConnectionCloseConfig` [source] ---- @@ -164,7 +165,8 @@ A high value can reduce the number of connections to the server but it could red A low value can increase the number of connections to the server but it could increase the performance of the producer and the consumer. -The consumers share the same handler, so if you have a high number of consumers per connection, the handler could be a bottleneck. It means that if there is a slow consumer all the other consumers could be slow. +The consumers share the same handler, so if you have a high number of consumers per connection, the handler could be a bottleneck. +It means that if there is a slow consumer all the other consumers could be slow. TIP: You can use different StreamSystemConfig like: @@ -186,6 +188,7 @@ streamSystemToIncreaseThePerformances = new StreamSystemConfig{ } } ---- + There is not a magic number, you have to test and evaluate the best value for your use case. The `ConnectionCloseConfig` defines the policy to close the connection when the last producer or consumer is closed. @@ -194,7 +197,6 @@ The `ConnectionCloseConfig` defines the policy to close the connection when the The policy `CloseWhenEmpty` covers the standard use cases when the producers or consumers have long life running. - The policy `CloseWhenEmptyAndIdle` is useful when producers or consumers have short live and the pool has to be fast to create a new entity. The parameter `IdleTime` defines the time to wait before closing the connection when the last producer or consumer is closed. The parameter `CheckIdleTime` defines the time to check if the connection is idle. @@ -211,7 +213,6 @@ The parameter `CheckIdleTime` defines the time to check if the connection is idl Note: You can't close the stream systems if there are producers or consumers still running with the `CloseWhenEmptyAndIdle` policy. - [[entity-status]] [[address-resolver]] ===== When a Load Balancer is in Use @@ -396,7 +397,7 @@ This value is valid only for the `Send(Message)` method. |`StatusChanged` |The callback invoked when the producer status changes. See <> for more details. -|`null` +|`null` |=== @@ -815,7 +816,7 @@ The following table sums up the main settings to create a `Consumer` with `Consu |`ICrc32` |The <> to use to validate the chunk server crc32 . -|`null` (no CRC32) +| `StreamCrc32()` |`StatusChanged` |The callback invoked when the consumer status changes. See <> for more details. @@ -836,8 +837,8 @@ See the <> to find out more about the diffe ===== Check the CRC on Delivery RabbitMQ Stream provides a CRC32 checksum on each chunk. -The client library can check the checksum before parse the chunk and throw an `CrcException` exception if the validation fails. -By default the CRC32 checksum is not enabled, to enable it you need to set the `ICrc32` interface in the `ConsumerConfig`: +The client library can check the checksum before parsing the chunk. +By default the CRC32 checksum enabled (with the class `StreamCrc32()` ), to disable it you need to set null to `ICrc32` interface in the `ConsumerConfig`: .Checking the CRC32 checksum on the chunk [source,c#,indent=0] @@ -847,13 +848,30 @@ include::{test-examples}/ConsumerUsage.cs[tag=consumer-creation-crc] <1> An implementation of the `ICrc32` interface. You are free to use any implementation you want. -The client is tested with `System.IO.Hashing`. -`System.IO.Hashing` is not shipped with the client library. -<2> Set the `ICrc32` implementation in the `ConsumerConfig` +By default the client library uses `StreamCrc32` which is a simple implementation of the `ICrc32` interface. +<2> The function FailAction is called when the CRC32 checksum is not valid. + +`FailAction` return types: + +[%header,cols=2*] +|=== +|Parameter Name +|Description -It is recommended to use it. -It _could_ reduce the performance of the consumer. -It depends on the use case. +|`ChunkAction.Skip` +|Skip the chunk and continue consuming the next chunk. + +|`ChunkAction.TryToProcess` +|Try to process the chunk even if the CRC32 checksum is not valid. +|=== + +If `FailAction` is `null` the library will use `ChunkAction.Skip` as default value. + +`FailAction` has `sourceConsumer` as parameter, which is the consumer that failed to validate the CRC32 checksum. +the code in the `FailAction` should be short, fast and safe, as it is executed in the consumer thread. + +It is recommended to leave it enabled, as it allows the client library to detect corrupted chunks and avoid parsing them. +Disabling the CRC32 _could_ improve performance, but it is not recommended as it can lead to parsing corrupted chunks and unexpected behavior. [[consumer-reconnect-strategy]] [[specifying-an-offset]] @@ -940,7 +958,6 @@ include::{test-examples}/ConsumerUsage.cs[tag=manual-tracking-defaults] The snippet above uses `consumer.StoreOffset(context.Offset)` to store at the offset of the current message. It is possible to store the offset in a more generic way with `StreamSystem.StoreOffset(reference,stream, offsetValue)` - ====== Considerations On Offset Tracking _When to store offsets?_ Avoid storing offsets too often or, worse, for each message. @@ -1073,13 +1090,14 @@ include::{test-examples}/ConsumerUsage.cs[tag=sac-consumer-update-listener] <2> Enable single active consumer <3> Handle `ConsumerUpdateListener` callback - [[entity-status]] ==== Producer/Consumer change status callback + `Producer` and `Consumer` classes provide a callback to handle the status change. It is possible to configure the event using the configuration `StatusChanged` property. like the following snippet: + [source,c#,indent=0] -------- var conf = new ConsumerConfig(system, stream) @@ -1148,7 +1166,6 @@ the `statusInfo` contains the following information: A full example of the status change callback can be found in https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/ReliableClient[here]. - [[low-high-level-classes]] ==== Low Level and High Level classes From 63d93103b1680ceef579077b280079c06c8a5cd5 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 19:09:02 +0200 Subject: [PATCH 16/19] small fix Signed-off-by: Gabriele Santomaggio --- Directory.Packages.props | 2 +- docs/Documentation/GettingStarted.cs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index e4e1a379..f89f9b50 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -5,7 +5,6 @@ - @@ -21,6 +20,7 @@ + diff --git a/docs/Documentation/GettingStarted.cs b/docs/Documentation/GettingStarted.cs index 02c3fd6c..3c92ba76 100644 --- a/docs/Documentation/GettingStarted.cs +++ b/docs/Documentation/GettingStarted.cs @@ -121,11 +121,11 @@ await producer.Send( // <6> } - confirmationTaskCompletionSource.Task.Wait(); // <7> + await confirmationTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); // <7> await producer.Close().ConfigureAwait(false); // <8> // end::sample-producer[] - var consumerTaskCompletionSource = new TaskCompletionSource(); + var consumerTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumerCount = 0; // Create a consumer // tag::sample-consumer[] @@ -149,7 +149,7 @@ await producer.Send( // <6> consumerLogger // <4> ) .ConfigureAwait(false); - consumerTaskCompletionSource.Task.Wait(); // <5> + await consumerTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); // <5> await consumer.Close().ConfigureAwait(false); // <6> // end::sample-consumer[] From a88f105bcd7361a78220a18db2bae80e893efe7b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 19:32:26 +0200 Subject: [PATCH 17/19] small doc fix Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/IConsumer.cs | 2 -- RabbitMQ.Stream.Client/ICrc32.cs | 5 +++-- RabbitMQ.Stream.Client/RawConsumer.cs | 5 ++--- RabbitMQ.Stream.Client/StreamCrc32.cs | 8 +++++++- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/RabbitMQ.Stream.Client/IConsumer.cs b/RabbitMQ.Stream.Client/IConsumer.cs index 0dd1ea50..c7c6bda3 100644 --- a/RabbitMQ.Stream.Client/IConsumer.cs +++ b/RabbitMQ.Stream.Client/IConsumer.cs @@ -71,8 +71,6 @@ public ushort InitialCredits } } - // enables the check of the crc on the delivery. - // the server will send the crc for each chunk and the client will check it. // It is enabled by default. You can disable it by setting it to null. // It is recommended to keep it enabled. Disable it only for performance reasons. public ICrc32 Crc32 { get; set; } = new StreamCrc32(); diff --git a/RabbitMQ.Stream.Client/ICrc32.cs b/RabbitMQ.Stream.Client/ICrc32.cs index 022c9c66..50ccd815 100644 --- a/RabbitMQ.Stream.Client/ICrc32.cs +++ b/RabbitMQ.Stream.Client/ICrc32.cs @@ -9,12 +9,13 @@ namespace RabbitMQ.Stream.Client public enum ChunkAction { /// - /// The consumer will TryToProcess the Chunk. + /// The consumer will try to process the Chunk. /// TryToProcess, /// - /// The consumer will Skip the Chunk and continue processing the next message. + /// The consumer will skip the Chunk and continue processing the next Chunk. + /// All the messages in the Chunk will be skipped. /// Skip } diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 76f4f7d7..54c3d53f 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -513,7 +513,6 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // continue; // skip the chunk case ChunkAction.TryToProcess: // That's what happens most of the time, and this is the default action - // Process the chunk await ParseChunk(chunk).ConfigureAwait(false); break; default: @@ -634,8 +633,8 @@ private async Task Init() } } - // if the chunkAction is passed to the _chunksBuffer because the ProcessChunks task - // asks for the credits. If we skip here no more credits will be requested + // The chunkAction is passed to the _chunksBuffer because the ProcessChunks task + // asks for the credits in a Task. If we skip the chunk here no more credits will be requested await _chunksBuffer.Writer.WriteAsync((deliver.Chunk, chunkAction), Token) .ConfigureAwait(false); } diff --git a/RabbitMQ.Stream.Client/StreamCrc32.cs b/RabbitMQ.Stream.Client/StreamCrc32.cs index ffbdc602..fe1177dc 100644 --- a/RabbitMQ.Stream.Client/StreamCrc32.cs +++ b/RabbitMQ.Stream.Client/StreamCrc32.cs @@ -6,12 +6,18 @@ namespace RabbitMQ.Stream.Client; +/// +/// Default implementation of using the System.IO.Hashing.Crc32 class. +/// The consumer uses this implementation by default to perform CRC32 checks on chunk received from the server. +/// FailAction is set to null by default, meaning that no custom action is taken when the CRC32 check fails. +/// It uses the default action of skipping the chunk. +/// public class StreamCrc32 : ICrc32 { public byte[] Hash(byte[] data) { return System.IO.Hashing.Crc32.Hash(data); } - + public Func FailAction { get; set; } = null; } From 1c4d253a98a17b42c3e668cabeec21733b2c6c08 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 19:35:32 +0200 Subject: [PATCH 18/19] small doc fix Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/StreamCrc32.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RabbitMQ.Stream.Client/StreamCrc32.cs b/RabbitMQ.Stream.Client/StreamCrc32.cs index fe1177dc..592890fc 100644 --- a/RabbitMQ.Stream.Client/StreamCrc32.cs +++ b/RabbitMQ.Stream.Client/StreamCrc32.cs @@ -18,6 +18,6 @@ public byte[] Hash(byte[] data) { return System.IO.Hashing.Crc32.Hash(data); } - + public Func FailAction { get; set; } = null; } From 2774af9e9024e6e21647b6b923a4d3dbc81b462b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Jun 2025 19:55:53 +0200 Subject: [PATCH 19/19] remove tests for .bet 9 Signed-off-by: Gabriele Santomaggio --- Tests/Tests.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/Tests.csproj b/Tests/Tests.csproj index 4665a6d9..9342b265 100644 --- a/Tests/Tests.csproj +++ b/Tests/Tests.csproj @@ -6,8 +6,8 @@ Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. Broadcom Broadcom Inc. and/or its subsidiaries. - net8.0;net9.0 false + net8.0