diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 6fccf392..f4a12a5f 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -8,6 +8,12 @@ jobs: name: build/test on windows-latest runs-on: windows-latest steps: + - name: Clone repository + uses: actions/checkout@v4 + - name: Setup .NET SDK + uses: actions/setup-dotnet@v4 + with: + global-json-file: global.json - uses: actions/checkout@v4 - uses: actions/cache@v4 with: @@ -35,6 +41,12 @@ jobs: name: build/test on ubuntu-latest runs-on: ubuntu-latest steps: + - name: Clone repository + uses: actions/checkout@v4 + - name: Setup .NET SDK + uses: actions/setup-dotnet@v4 + with: + global-json-file: global.json - uses: actions/checkout@v4 - uses: actions/setup-dotnet@v4 with: diff --git a/Directory.Packages.props b/Directory.Packages.props index adf37d6d..f89f9b50 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -20,6 +20,7 @@ + diff --git a/RabbitMQ.Stream.Client/ClientExceptions.cs b/RabbitMQ.Stream.Client/ClientExceptions.cs index 71d47c6f..95e2db59 100644 --- a/RabbitMQ.Stream.Client/ClientExceptions.cs +++ b/RabbitMQ.Stream.Client/ClientExceptions.cs @@ -177,14 +177,6 @@ public UnknownCommandException(string s) } } - public class CrcException : Exception - { - public CrcException(string s) - : base(s) - { - } - } - public class TooManyConnectionsException : Exception { public TooManyConnectionsException(string s) diff --git a/RabbitMQ.Stream.Client/IConsumer.cs b/RabbitMQ.Stream.Client/IConsumer.cs index b44e48b2..c7c6bda3 100644 --- a/RabbitMQ.Stream.Client/IConsumer.cs +++ b/RabbitMQ.Stream.Client/IConsumer.cs @@ -71,10 +71,9 @@ public ushort InitialCredits } } - // enables the check of the crc on the delivery. - // the server will send the crc for each chunk and the client will check it. - // It is not enabled by default because it is could reduce the performance. - public ICrc32 Crc32 { get; set; } = null; + // It is enabled by default. You can disable it by setting it to null. + // It is recommended to keep it enabled. Disable it only for performance reasons. + public ICrc32 Crc32 { get; set; } = new StreamCrc32(); } public class ConsumerInfo : Info @@ -90,6 +89,7 @@ public ConsumerInfo(string stream, string reference, string identifier, List + /// The consumer will try to process the Chunk. + /// + TryToProcess, + + /// + /// The consumer will skip the Chunk and continue processing the next Chunk. + /// All the messages in the Chunk will be skipped. + /// + Skip + } + /// /// ICrc32 defines an interface for implementing crc32 hashing. /// Library users who wish to perform crc32 checks on data from RabbitMQ @@ -13,5 +29,13 @@ namespace RabbitMQ.Stream.Client public interface ICrc32 { byte[] Hash(byte[] data); + + /// + /// FailAction is called when the Crc32 check fails. + /// The user can assign a function that returns a . + /// It is possible to add custom logic to handle the failure, such as logging. + /// The code here should be safe + /// + Func FailAction { get; set; } } } diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index e66563b2..a89ca90f 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -35,6 +35,9 @@ RabbitMQ.Stream.Client.BindingsSuperStreamSpec.BindingsSuperStreamSpec(string Na RabbitMQ.Stream.Client.Chunk.Crc.get -> uint RabbitMQ.Stream.Client.Chunk.Data.get -> System.ReadOnlyMemory RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte +RabbitMQ.Stream.Client.ChunkAction +RabbitMQ.Stream.Client.ChunkAction.Skip = 1 -> RabbitMQ.Stream.Client.ChunkAction +RabbitMQ.Stream.Client.ChunkAction.TryToProcess = 0 -> RabbitMQ.Stream.Client.ChunkAction RabbitMQ.Stream.Client.Client.ClientId.get -> string RabbitMQ.Stream.Client.Client.ClientId.init -> void RabbitMQ.Stream.Client.Client.Consumers.get -> System.Collections.Generic.IDictionary @@ -109,8 +112,6 @@ RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void RabbitMQ.Stream.Client.ConsumerInfo RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string identifier, System.Collections.Generic.List partitions) -> void RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string -RabbitMQ.Stream.Client.CrcException -RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void RabbitMQ.Stream.Client.CreateConsumerException.CreateConsumerException(string s, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void RabbitMQ.Stream.Client.CreateException RabbitMQ.Stream.Client.CreateException.CreateException(string s) -> void @@ -175,6 +176,8 @@ RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void RabbitMQ.Stream.Client.ICrc32 +RabbitMQ.Stream.Client.ICrc32.FailAction.get -> System.Func +RabbitMQ.Stream.Client.ICrc32.FailAction.set -> void RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[] RabbitMQ.Stream.Client.Info RabbitMQ.Stream.Client.Info.Identifier.get -> string @@ -322,6 +325,11 @@ RabbitMQ.Stream.Client.RouteQueryResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType +RabbitMQ.Stream.Client.StreamCrc32 +RabbitMQ.Stream.Client.StreamCrc32.FailAction.get -> System.Func +RabbitMQ.Stream.Client.StreamCrc32.FailAction.set -> void +RabbitMQ.Stream.Client.StreamCrc32.Hash(byte[] data) -> byte[] +RabbitMQ.Stream.Client.StreamCrc32.StreamCrc32() -> void RabbitMQ.Stream.Client.StreamStats RabbitMQ.Stream.Client.StreamStats.CommittedChunkId() -> ulong RabbitMQ.Stream.Client.StreamStats.FirstOffset() -> ulong diff --git a/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj b/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj index 70ea8824..081a7edc 100644 --- a/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj +++ b/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj @@ -39,6 +39,7 @@ + diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index f34b367f..54c3d53f 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -130,14 +130,16 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable { private readonly RawConsumerConfig _config; - private readonly Channel _chunksBuffer; + private readonly Channel<(Chunk, ChunkAction)> _chunksBuffer; + private readonly ushort _initialCredits; // _completeSubscription is used to notify the ProcessChunks task // that the subscription is completed and so it can start to process the chunks // this is needed because the socket starts to receive the chunks before the subscription_id is // assigned. - private readonly TaskCompletionSource _completeSubscription = new(); + private readonly TaskCompletionSource _completeSubscription = + new(TaskCreationOptions.RunContinuationsAsynchronously); protected sealed override string DumpEntityConfiguration() { @@ -161,7 +163,8 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration()); Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier, null); // _chunksBuffer is a channel that is used to buffer the chunks - _chunksBuffer = Channel.CreateBounded(new BoundedChannelOptions(_initialCredits) + + _chunksBuffer = Channel.CreateBounded<(Chunk, ChunkAction)>(new BoundedChannelOptions(_initialCredits) { AllowSynchronousContinuations = false, SingleReader = true, @@ -220,7 +223,6 @@ public async Task StoreOffset(ulong offset) /// MaybeLockDispatch is an optimization to avoid to lock the dispatch /// when the consumer is not single active consumer /// - private async Task MaybeLockDispatch() { if (_config.IsSingleActiveConsumer) @@ -266,8 +268,8 @@ Message MessageFromSequence(ref ReadOnlySequence unCompressedData, ref int var slice = unCompressedData.Slice(compressOffset, 4); compressOffset += WireFormatting.ReadUInt32(ref slice, out var len); Debug.Assert(len > 0); - slice = unCompressedData.Slice(compressOffset, len); - Debug.Assert(slice.Length >= len); + var sliceMsg = unCompressedData.Slice(compressOffset, len); + Debug.Assert(sliceMsg.Length == len); compressOffset += (int)len; // Here we use the Message.From(ref ReadOnlySequence seq ..) method to parse the message @@ -275,7 +277,7 @@ Message MessageFromSequence(ref ReadOnlySequence unCompressedData, ref int // Since the ParseChunk is async and we cannot use the ref SequenceReader reader // See https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 for more details - var message = Message.From(ref slice, len); + var message = Message.From(ref sliceMsg, len); return message; } catch (Exception e) @@ -461,15 +463,18 @@ private void ProcessChunks() // need to wait the subscription is completed // else the _subscriberId could be incorrect _completeSubscription.Task.Wait(); + try { while (!Token.IsCancellationRequested && await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // { - while (_chunksBuffer.Reader.TryRead(out var chunk)) + while (_chunksBuffer.Reader.TryRead(out var chunkWithAction)) { // We send the credit to the server to allow the server to send more messages // we request the credit before process the check to keep the network busy + + var (chunk, action) = chunkWithAction; try { if (Token.IsCancellationRequested) @@ -498,8 +503,21 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // // and close the task if (Token.IsCancellationRequested) break; - - await ParseChunk(chunk).ConfigureAwait(false); + switch (action) + { + case ChunkAction.Skip: + // the chunk will be skipped due of CRC32 fail + Logger?.LogWarning( + "The chunk {ChunkId} will be skipped for {EntityInfo}", + chunk.ChunkId, DumpEntityConfiguration()); + continue; // skip the chunk + case ChunkAction.TryToProcess: + // That's what happens most of the time, and this is the default action + await ParseChunk(chunk).ConfigureAwait(false); + break; + default: + throw new ArgumentOutOfRangeException(); + } } } @@ -593,6 +611,8 @@ private async Task Init() return; } + var chunkAction = ChunkAction.TryToProcess; + if (_config.Crc32 is not null) { var crcCalculated = BitConverter.ToUInt32( @@ -606,13 +626,17 @@ private async Task Init() DumpEntityConfiguration(), chunkConsumed); - throw new CrcException( - $"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " + - $"Chunk Consumed {chunkConsumed}"); + // if the user has set the FailAction, we call it + // to allow the user to handle the chunk action + // if the FailAction is not set, we skip the chunk + chunkAction = _config.Crc32.FailAction?.Invoke(this) ?? ChunkAction.Skip; } } - await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false); + // The chunkAction is passed to the _chunksBuffer because the ProcessChunks task + // asks for the credits in a Task. If we skip the chunk here no more credits will be requested + await _chunksBuffer.Writer.WriteAsync((deliver.Chunk, chunkAction), Token) + .ConfigureAwait(false); } catch (OperationCanceledException) { diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index bd9950d1..b73095a4 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -61,7 +61,11 @@ private async Task StandardConsumer(bool boot) ConnectionClosedHandler = async (closeReason) => { if (IsClosedNormally(closeReason)) + { + UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser, + [_consumerConfig.Stream]); return; + } try { @@ -153,7 +157,12 @@ private async Task SuperConsumer(bool boot) ConnectionClosedHandler = async (closeReason, partitionStream) => { if (IsClosedNormally(closeReason)) + { + UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser, + [partitionStream]); return; + } + await RandomWait().ConfigureAwait(false); try { diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index 2d173326..0e3c7ae8 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -50,7 +50,12 @@ private async Task SuperStreamProducer(bool boot) { await RandomWait().ConfigureAwait(false); if (IsClosedNormally(closeReason)) + { + UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser, + [partitionStream]); return; + } + try { var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; @@ -121,7 +126,11 @@ private async Task StandardProducer() { await RandomWait().ConfigureAwait(false); if (IsClosedNormally()) + { + UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser, + [_producerConfig.Stream]); return; + } await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream, ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false); diff --git a/RabbitMQ.Stream.Client/StreamCrc32.cs b/RabbitMQ.Stream.Client/StreamCrc32.cs new file mode 100644 index 00000000..592890fc --- /dev/null +++ b/RabbitMQ.Stream.Client/StreamCrc32.cs @@ -0,0 +1,23 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +using System; + +namespace RabbitMQ.Stream.Client; + +/// +/// Default implementation of using the System.IO.Hashing.Crc32 class. +/// The consumer uses this implementation by default to perform CRC32 checks on chunk received from the server. +/// FailAction is set to null by default, meaning that no custom action is taken when the CRC32 check fails. +/// It uses the default action of skipping the chunk. +/// +public class StreamCrc32 : ICrc32 +{ + public byte[] Hash(byte[] data) + { + return System.IO.Hashing.Crc32.Hash(data); + } + + public Func FailAction { get; set; } = null; +} diff --git a/Tests/Crc32.cs b/Tests/Crc32.cs deleted file mode 100644 index 7d7b3df3..00000000 --- a/Tests/Crc32.cs +++ /dev/null @@ -1,15 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. - -using RabbitMQ.Stream.Client; - -namespace Tests; - -public class Crc32 : ICrc32 -{ - public byte[] Hash(byte[] data) - { - return System.IO.Hashing.Crc32.Hash(data); - } -} diff --git a/Tests/Crc32Tests.cs b/Tests/Crc32Tests.cs new file mode 100644 index 00000000..d2fa4575 --- /dev/null +++ b/Tests/Crc32Tests.cs @@ -0,0 +1,164 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +using System; +using System.Threading.Tasks; +using RabbitMQ.Stream.Client; +using RabbitMQ.Stream.Client.Reliable; +using Xunit; +using Xunit.Abstractions; + +namespace Tests; + +public class FirstCrc32IsWrong : ICrc32 +{ + private int _callCount = 0; + + public byte[] Hash(byte[] data) + { + _callCount++; + return _callCount == 1 + ? + // Return a wrong hash on the first call to simulate a CRC32 failure + // second call will return the correct hash + [0x00, 0x00, 0x00, 0x04] + : System.IO.Hashing.Crc32.Hash(data); + } + + public Func FailAction { get; set; } +} + +public class Crc32Tests(ITestOutputHelper testOutputHelper) +{ + /// + /// Tests the Crc32 functionality of the consumer. + /// In this case, the first Crc32 is wrong, so the consumer should skip the chunk. + /// So the consumer should receive the second chunk. + /// + [Fact] + public async Task Crc32ShouldSkipChunk() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var completionSource = new TaskCompletionSource(); + var consumerConfig = new ConsumerConfig(system, stream) + { + Crc32 = new FirstCrc32IsWrong() { FailAction = (_) => ChunkAction.Skip }, + InitialCredits = 1, + MessageHandler = (_, _, messageContext, _) => + { + completionSource.TrySetResult(messageContext); + return Task.CompletedTask; + } + }; + var consumer = await Consumer.Create(consumerConfig); + await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); + await SystemUtils.WaitAsync(TimeSpan.FromMilliseconds(700)); + await SystemUtils.PublishMessages(system, stream, 5, "2", testOutputHelper); + var messageContext = await completionSource.Task; + if (messageContext.ChunkId == 0) + { + Assert.Fail($"Expected the second chunk to be processed, but it was not.ChunkId {messageContext.ChunkId}"); + } + + Assert.True(consumer.IsOpen()); + await consumer.Close(); + await SystemUtils.CleanUpStreamSystem(system, stream); + } + + [Fact] + public async Task Crc32ShouldSkipChunkWithTheDefaultBehavior() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var completionSource = new TaskCompletionSource(); + var consumerConfig = new ConsumerConfig(system, stream) + { + // FailAction is not set, so it will use the default behavior which is to skip the chunk + Crc32 = new FirstCrc32IsWrong() { }, + InitialCredits = 1, + MessageHandler = (_, _, messageContext, _) => + { + completionSource.TrySetResult(messageContext); + return Task.CompletedTask; + } + }; + var consumer = await Consumer.Create(consumerConfig); + await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); + await SystemUtils.WaitAsync(TimeSpan.FromMilliseconds(700)); + await SystemUtils.PublishMessages(system, stream, 5, "2", testOutputHelper); + var messageContext = await completionSource.Task; + if (messageContext.ChunkId == 0) + { + Assert.Fail($"Expected the second chunk to be processed, but it was not.ChunkId {messageContext.ChunkId}"); + } + + Assert.True(consumer.IsOpen()); + await consumer.Close(); + await SystemUtils.CleanUpStreamSystem(system, stream); + } + + /// + /// Tests the Crc32 functionality of the consumer. + /// Simulate a CRC32 failure on the first chunk, + /// and add a custom action to close the consumer. + /// the consumer should be closed and the chunk should be skipped. + /// + [Fact] + public async Task Crc32ShouldCloseTheConsumer() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var consumerConfig = new ConsumerConfig(system, stream) + { + Crc32 = new FirstCrc32IsWrong() + { + FailAction = consumer => + { + consumer.Close(); + return ChunkAction.Skip; // Skip the chunk and close the consumer + } + }, + MessageHandler = (_, _, _, _) => Task.CompletedTask + }; + + var consumer = await Consumer.Create(consumerConfig); + await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); + await SystemUtils.WaitAsync(TimeSpan.FromMilliseconds(500)); + await SystemUtils.WaitUntilAsync(() => !consumer.IsOpen()); + await SystemUtils.CleanUpStreamSystem(system, stream); + } + + /// + /// Here we test an edge case where Crc32 is wrong, + /// but the consumer should still process the chunk. + /// by given the FailAction as TryToProcess. + /// In real life when a CRC32 is wrong the consumer can't process the chunk, + /// this is to give more flexibility to the user. + /// + [Fact] + public async Task Crc32ShouldParseTheChunk() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var completionSource = new TaskCompletionSource(); + var consumerConfig = new ConsumerConfig(system, stream) + { + Crc32 = new FirstCrc32IsWrong() { FailAction = (_) => ChunkAction.TryToProcess }, + MessageHandler = (_, _, messageContext, _) => + { + completionSource.TrySetResult(messageContext); + return Task.CompletedTask; + } + }; + var consumer = await Consumer.Create(consumerConfig); + await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper); + await SystemUtils.WaitAsync(); + var messageContext = await completionSource.Task; + if (messageContext.ChunkId != 0) + { + Assert.Fail($"Expected the first chunk to be processed, but it was not.ChunkId {messageContext.ChunkId}"); + } + + Assert.True(consumer.IsOpen()); + await consumer.Close(); + await SystemUtils.CleanUpStreamSystem(system, stream); + } +} diff --git a/Tests/RawConsumerSystemTests.cs b/Tests/RawConsumerSystemTests.cs index 7328ea6e..b208feec 100644 --- a/Tests/RawConsumerSystemTests.cs +++ b/Tests/RawConsumerSystemTests.cs @@ -20,7 +20,7 @@ namespace Tests { public class ConsumerSystemTests { - private readonly ICrc32 _crc32 = new Crc32(); + private readonly ICrc32 _crc32 = new StreamCrc32(); private readonly ITestOutputHelper testOutputHelper; public ConsumerSystemTests(ITestOutputHelper testOutputHelper) diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index c44acd9c..1d4eace3 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -18,7 +18,7 @@ namespace Tests; public class ReliableTests { - private readonly ICrc32 _crc32 = new Crc32(); + private readonly ICrc32 _crc32 = new StreamCrc32() { }; private readonly ITestOutputHelper _testOutputHelper; public ReliableTests(ITestOutputHelper testOutputHelper) @@ -48,7 +48,8 @@ public async Task MessageWithoutConfirmationRaiseTimeout() ); confirmationPipe.Start(); confirmationPipe.AddUnConfirmedMessage(1, new Message(Encoding.UTF8.GetBytes($"hello"))); - confirmationPipe.AddUnConfirmedMessage(2, new List() { new Message(Encoding.UTF8.GetBytes($"hello")) }); + confirmationPipe.AddUnConfirmedMessage(2, + new List() { new Message(Encoding.UTF8.GetBytes($"hello")) }); new Utils(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); Assert.Equal(2, await confirmationTask.Task); Assert.Equal(2, l.Count); diff --git a/Tests/SuperStreamConsumerTests.cs b/Tests/SuperStreamConsumerTests.cs index ac45fdbc..d1d1dc92 100644 --- a/Tests/SuperStreamConsumerTests.cs +++ b/Tests/SuperStreamConsumerTests.cs @@ -19,7 +19,6 @@ namespace Tests; public class SuperStreamConsumerTests { - private readonly ICrc32 _crc32 = new Crc32(); private readonly ITestOutputHelper _testOutputHelper; public SuperStreamConsumerTests(ITestOutputHelper testOutputHelper) @@ -91,7 +90,6 @@ public async Task NumberOfMessagesConsumedShouldBeEqualsToPublished() var consumer = await system.CreateSuperStreamConsumer( new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) { - Crc32 = _crc32, ClientProvidedName = clientProvidedName, Identifier = "super_stream_consumer_24680", OffsetSpec = diff --git a/Tests/Tests.csproj b/Tests/Tests.csproj index 5db4744b..9342b265 100644 --- a/Tests/Tests.csproj +++ b/Tests/Tests.csproj @@ -6,8 +6,8 @@ Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. Broadcom Broadcom Inc. and/or its subsidiaries. - net8.0;net9.0 false + net8.0 @@ -15,7 +15,6 @@ - diff --git a/docs/Documentation/ConsumerUsage.cs b/docs/Documentation/ConsumerUsage.cs index fcc9abc1..913da05c 100644 --- a/docs/Documentation/ConsumerUsage.cs +++ b/docs/Documentation/ConsumerUsage.cs @@ -128,15 +128,7 @@ public static async Task CreateConsumerSingleUpdateListener() await streamSystem.Close().ConfigureAwait(false); } - // tag::consumer-creation-crc[] - private class UserCrc32 : ICrc32 // <1> - { - public byte[] Hash(byte[] data) - { - // Here we use the System.IO.Hashing.Crc32 implementation - return System.IO.Hashing.Crc32.Hash(data); - } - } + public static async Task CreateConsumerWithCrc() { @@ -148,7 +140,10 @@ public static async Task CreateConsumerWithCrc() streamSystem, "my-stream") { - Crc32 = new UserCrc32(), // <2> + Crc32 = new StreamCrc32() // <1> + { + FailAction = (consumerInstance) => ChunkAction.Skip // <2> + }, OffsetSpec = new OffsetTypeTimestamp(), // end::consumer-creation-crc[] MessageHandler = async (stream, consumer, context, message) => // <4> diff --git a/docs/Documentation/Documentation.csproj b/docs/Documentation/Documentation.csproj index 2372da43..f2b8384c 100644 --- a/docs/Documentation/Documentation.csproj +++ b/docs/Documentation/Documentation.csproj @@ -20,7 +20,6 @@ - diff --git a/docs/Documentation/GettingStarted.cs b/docs/Documentation/GettingStarted.cs index 02c3fd6c..3c92ba76 100644 --- a/docs/Documentation/GettingStarted.cs +++ b/docs/Documentation/GettingStarted.cs @@ -121,11 +121,11 @@ await producer.Send( // <6> } - confirmationTaskCompletionSource.Task.Wait(); // <7> + await confirmationTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); // <7> await producer.Close().ConfigureAwait(false); // <8> // end::sample-producer[] - var consumerTaskCompletionSource = new TaskCompletionSource(); + var consumerTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumerCount = 0; // Create a consumer // tag::sample-consumer[] @@ -149,7 +149,7 @@ await producer.Send( // <6> consumerLogger // <4> ) .ConfigureAwait(false); - consumerTaskCompletionSource.Task.Wait(); // <5> + await consumerTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); // <5> await consumer.Close().ConfigureAwait(false); // <6> // end::sample-consumer[] diff --git a/docs/ReliableClient/ReliableClient.csproj b/docs/ReliableClient/ReliableClient.csproj index 27b2351e..da4b0615 100644 --- a/docs/ReliableClient/ReliableClient.csproj +++ b/docs/ReliableClient/ReliableClient.csproj @@ -7,16 +7,17 @@ net8.0;net9.0 - - - - + + - + + + + Always diff --git a/docs/asciidoc/api.adoc b/docs/asciidoc/api.adoc index ddf6d7f4..dd4e3b53 100644 --- a/docs/asciidoc/api.adoc +++ b/docs/asciidoc/api.adoc @@ -5,7 +5,7 @@ ==== Overview This section describes the API to connect to the RabbitMQ Stream Plugin, publish messages, and consume messages. -There are 3 main interfaces: +There are three main interfaces: * `RabbitMQ.Stream.Client` for connecting to a node and optionally managing streams. * `RabbitMQ.Stream.Client.Reliable.Producer` to publish messages. @@ -141,8 +141,9 @@ The following table sums up the main settings to create an `StreamSystem` using [[connection-pool]] ===== Connection pool + Introduced on version 1.8.0. -With the connection pool you can define how many producers and consumers can be created on a single connection and the `ConnectionCloseConfig` +With the connection pool you can define how many producers and consumers can be created on a single connection and the `ConnectionCloseConfig` [source] ---- @@ -164,7 +165,8 @@ A high value can reduce the number of connections to the server but it could red A low value can increase the number of connections to the server but it could increase the performance of the producer and the consumer. -The consumers share the same handler, so if you have a high number of consumers per connection, the handler could be a bottleneck. It means that if there is a slow consumer all the other consumers could be slow. +The consumers share the same handler, so if you have a high number of consumers per connection, the handler could be a bottleneck. +It means that if there is a slow consumer all the other consumers could be slow. TIP: You can use different StreamSystemConfig like: @@ -186,6 +188,7 @@ streamSystemToIncreaseThePerformances = new StreamSystemConfig{ } } ---- + There is not a magic number, you have to test and evaluate the best value for your use case. The `ConnectionCloseConfig` defines the policy to close the connection when the last producer or consumer is closed. @@ -194,7 +197,6 @@ The `ConnectionCloseConfig` defines the policy to close the connection when the The policy `CloseWhenEmpty` covers the standard use cases when the producers or consumers have long life running. - The policy `CloseWhenEmptyAndIdle` is useful when producers or consumers have short live and the pool has to be fast to create a new entity. The parameter `IdleTime` defines the time to wait before closing the connection when the last producer or consumer is closed. The parameter `CheckIdleTime` defines the time to check if the connection is idle. @@ -211,7 +213,6 @@ The parameter `CheckIdleTime` defines the time to check if the connection is idl Note: You can't close the stream systems if there are producers or consumers still running with the `CloseWhenEmptyAndIdle` policy. - [[entity-status]] [[address-resolver]] ===== When a Load Balancer is in Use @@ -396,7 +397,7 @@ This value is valid only for the `Send(Message)` method. |`StatusChanged` |The callback invoked when the producer status changes. See <> for more details. -|`null` +|`null` |=== @@ -815,7 +816,7 @@ The following table sums up the main settings to create a `Consumer` with `Consu |`ICrc32` |The <> to use to validate the chunk server crc32 . -|`null` (no CRC32) +| `StreamCrc32()` |`StatusChanged` |The callback invoked when the consumer status changes. See <> for more details. @@ -836,8 +837,8 @@ See the <> to find out more about the diffe ===== Check the CRC on Delivery RabbitMQ Stream provides a CRC32 checksum on each chunk. -The client library can check the checksum before parse the chunk and throw an `CrcException` exception if the validation fails. -By default the CRC32 checksum is not enabled, to enable it you need to set the `ICrc32` interface in the `ConsumerConfig`: +The client library can check the checksum before parsing the chunk. +By default the CRC32 checksum enabled (with the class `StreamCrc32()` ), to disable it you need to set null to `ICrc32` interface in the `ConsumerConfig`: .Checking the CRC32 checksum on the chunk [source,c#,indent=0] @@ -847,13 +848,30 @@ include::{test-examples}/ConsumerUsage.cs[tag=consumer-creation-crc] <1> An implementation of the `ICrc32` interface. You are free to use any implementation you want. -The client is tested with `System.IO.Hashing`. -`System.IO.Hashing` is not shipped with the client library. -<2> Set the `ICrc32` implementation in the `ConsumerConfig` +By default the client library uses `StreamCrc32` which is a simple implementation of the `ICrc32` interface. +<2> The function FailAction is called when the CRC32 checksum is not valid. + +`FailAction` return types: + +[%header,cols=2*] +|=== +|Parameter Name +|Description -It is recommended to use it. -It _could_ reduce the performance of the consumer. -It depends on the use case. +|`ChunkAction.Skip` +|Skip the chunk and continue consuming the next chunk. + +|`ChunkAction.TryToProcess` +|Try to process the chunk even if the CRC32 checksum is not valid. +|=== + +If `FailAction` is `null` the library will use `ChunkAction.Skip` as default value. + +`FailAction` has `sourceConsumer` as parameter, which is the consumer that failed to validate the CRC32 checksum. +the code in the `FailAction` should be short, fast and safe, as it is executed in the consumer thread. + +It is recommended to leave it enabled, as it allows the client library to detect corrupted chunks and avoid parsing them. +Disabling the CRC32 _could_ improve performance, but it is not recommended as it can lead to parsing corrupted chunks and unexpected behavior. [[consumer-reconnect-strategy]] [[specifying-an-offset]] @@ -940,7 +958,6 @@ include::{test-examples}/ConsumerUsage.cs[tag=manual-tracking-defaults] The snippet above uses `consumer.StoreOffset(context.Offset)` to store at the offset of the current message. It is possible to store the offset in a more generic way with `StreamSystem.StoreOffset(reference,stream, offsetValue)` - ====== Considerations On Offset Tracking _When to store offsets?_ Avoid storing offsets too often or, worse, for each message. @@ -1073,13 +1090,14 @@ include::{test-examples}/ConsumerUsage.cs[tag=sac-consumer-update-listener] <2> Enable single active consumer <3> Handle `ConsumerUpdateListener` callback - [[entity-status]] ==== Producer/Consumer change status callback + `Producer` and `Consumer` classes provide a callback to handle the status change. It is possible to configure the event using the configuration `StatusChanged` property. like the following snippet: + [source,c#,indent=0] -------- var conf = new ConsumerConfig(system, stream) @@ -1148,7 +1166,6 @@ the `statusInfo` contains the following information: A full example of the status change callback can be found in https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/ReliableClient[here]. - [[low-high-level-classes]] ==== Low Level and High Level classes diff --git a/global.json b/global.json index 47a7fa60..af8d7f46 100644 --- a/global.json +++ b/global.json @@ -1,5 +1,6 @@ { "sdk": { - "allowPrerelease": false + "allowPrerelease": false, + "version": "9.0.202" } }