diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml
index 6fccf392..f4a12a5f 100644
--- a/.github/workflows/build-test.yaml
+++ b/.github/workflows/build-test.yaml
@@ -8,6 +8,12 @@ jobs:
name: build/test on windows-latest
runs-on: windows-latest
steps:
+ - name: Clone repository
+ uses: actions/checkout@v4
+ - name: Setup .NET SDK
+ uses: actions/setup-dotnet@v4
+ with:
+ global-json-file: global.json
- uses: actions/checkout@v4
- uses: actions/cache@v4
with:
@@ -35,6 +41,12 @@ jobs:
name: build/test on ubuntu-latest
runs-on: ubuntu-latest
steps:
+ - name: Clone repository
+ uses: actions/checkout@v4
+ - name: Setup .NET SDK
+ uses: actions/setup-dotnet@v4
+ with:
+ global-json-file: global.json
- uses: actions/checkout@v4
- uses: actions/setup-dotnet@v4
with:
diff --git a/Directory.Packages.props b/Directory.Packages.props
index adf37d6d..f89f9b50 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -20,6 +20,7 @@
+
diff --git a/RabbitMQ.Stream.Client/ClientExceptions.cs b/RabbitMQ.Stream.Client/ClientExceptions.cs
index 71d47c6f..95e2db59 100644
--- a/RabbitMQ.Stream.Client/ClientExceptions.cs
+++ b/RabbitMQ.Stream.Client/ClientExceptions.cs
@@ -177,14 +177,6 @@ public UnknownCommandException(string s)
}
}
- public class CrcException : Exception
- {
- public CrcException(string s)
- : base(s)
- {
- }
- }
-
public class TooManyConnectionsException : Exception
{
public TooManyConnectionsException(string s)
diff --git a/RabbitMQ.Stream.Client/IConsumer.cs b/RabbitMQ.Stream.Client/IConsumer.cs
index b44e48b2..c7c6bda3 100644
--- a/RabbitMQ.Stream.Client/IConsumer.cs
+++ b/RabbitMQ.Stream.Client/IConsumer.cs
@@ -71,10 +71,9 @@ public ushort InitialCredits
}
}
- // enables the check of the crc on the delivery.
- // the server will send the crc for each chunk and the client will check it.
- // It is not enabled by default because it is could reduce the performance.
- public ICrc32 Crc32 { get; set; } = null;
+ // It is enabled by default. You can disable it by setting it to null.
+ // It is recommended to keep it enabled. Disable it only for performance reasons.
+ public ICrc32 Crc32 { get; set; } = new StreamCrc32();
}
public class ConsumerInfo : Info
@@ -90,6 +89,7 @@ public ConsumerInfo(string stream, string reference, string identifier, List
+ /// The consumer will try to process the Chunk.
+ ///
+ TryToProcess,
+
+ ///
+ /// The consumer will skip the Chunk and continue processing the next Chunk.
+ /// All the messages in the Chunk will be skipped.
+ ///
+ Skip
+ }
+
///
/// ICrc32 defines an interface for implementing crc32 hashing.
/// Library users who wish to perform crc32 checks on data from RabbitMQ
@@ -13,5 +29,13 @@ namespace RabbitMQ.Stream.Client
public interface ICrc32
{
byte[] Hash(byte[] data);
+
+ ///
+ /// FailAction is called when the Crc32 check fails.
+ /// The user can assign a function that returns a .
+ /// It is possible to add custom logic to handle the failure, such as logging.
+ /// The code here should be safe
+ ///
+ Func FailAction { get; set; }
}
}
diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
index e66563b2..a89ca90f 100644
--- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
+++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
@@ -35,6 +35,9 @@ RabbitMQ.Stream.Client.BindingsSuperStreamSpec.BindingsSuperStreamSpec(string Na
RabbitMQ.Stream.Client.Chunk.Crc.get -> uint
RabbitMQ.Stream.Client.Chunk.Data.get -> System.ReadOnlyMemory
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
+RabbitMQ.Stream.Client.ChunkAction
+RabbitMQ.Stream.Client.ChunkAction.Skip = 1 -> RabbitMQ.Stream.Client.ChunkAction
+RabbitMQ.Stream.Client.ChunkAction.TryToProcess = 0 -> RabbitMQ.Stream.Client.ChunkAction
RabbitMQ.Stream.Client.Client.ClientId.get -> string
RabbitMQ.Stream.Client.Client.ClientId.init -> void
RabbitMQ.Stream.Client.Client.Consumers.get -> System.Collections.Generic.IDictionary
@@ -109,8 +112,6 @@ RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string identifier, System.Collections.Generic.List partitions) -> void
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
-RabbitMQ.Stream.Client.CrcException
-RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
RabbitMQ.Stream.Client.CreateConsumerException.CreateConsumerException(string s, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void
RabbitMQ.Stream.Client.CreateException
RabbitMQ.Stream.Client.CreateException.CreateException(string s) -> void
@@ -175,6 +176,8 @@ RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
+RabbitMQ.Stream.Client.ICrc32.FailAction.get -> System.Func
+RabbitMQ.Stream.Client.ICrc32.FailAction.set -> void
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Info.Identifier.get -> string
@@ -322,6 +325,11 @@ RabbitMQ.Stream.Client.RouteQueryResponse.Write(System.Span span) -> int
RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType
+RabbitMQ.Stream.Client.StreamCrc32
+RabbitMQ.Stream.Client.StreamCrc32.FailAction.get -> System.Func
+RabbitMQ.Stream.Client.StreamCrc32.FailAction.set -> void
+RabbitMQ.Stream.Client.StreamCrc32.Hash(byte[] data) -> byte[]
+RabbitMQ.Stream.Client.StreamCrc32.StreamCrc32() -> void
RabbitMQ.Stream.Client.StreamStats
RabbitMQ.Stream.Client.StreamStats.CommittedChunkId() -> ulong
RabbitMQ.Stream.Client.StreamStats.FirstOffset() -> ulong
diff --git a/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj b/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj
index 70ea8824..081a7edc 100644
--- a/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj
+++ b/RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj
@@ -39,6 +39,7 @@
+
diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs
index f34b367f..54c3d53f 100644
--- a/RabbitMQ.Stream.Client/RawConsumer.cs
+++ b/RabbitMQ.Stream.Client/RawConsumer.cs
@@ -130,14 +130,16 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable
{
private readonly RawConsumerConfig _config;
- private readonly Channel _chunksBuffer;
+ private readonly Channel<(Chunk, ChunkAction)> _chunksBuffer;
+
private readonly ushort _initialCredits;
// _completeSubscription is used to notify the ProcessChunks task
// that the subscription is completed and so it can start to process the chunks
// this is needed because the socket starts to receive the chunks before the subscription_id is
// assigned.
- private readonly TaskCompletionSource _completeSubscription = new();
+ private readonly TaskCompletionSource _completeSubscription =
+ new(TaskCreationOptions.RunContinuationsAsynchronously);
protected sealed override string DumpEntityConfiguration()
{
@@ -161,7 +163,8 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration());
Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier, null);
// _chunksBuffer is a channel that is used to buffer the chunks
- _chunksBuffer = Channel.CreateBounded(new BoundedChannelOptions(_initialCredits)
+
+ _chunksBuffer = Channel.CreateBounded<(Chunk, ChunkAction)>(new BoundedChannelOptions(_initialCredits)
{
AllowSynchronousContinuations = false,
SingleReader = true,
@@ -220,7 +223,6 @@ public async Task StoreOffset(ulong offset)
/// MaybeLockDispatch is an optimization to avoid to lock the dispatch
/// when the consumer is not single active consumer
///
-
private async Task MaybeLockDispatch()
{
if (_config.IsSingleActiveConsumer)
@@ -266,8 +268,8 @@ Message MessageFromSequence(ref ReadOnlySequence unCompressedData, ref int
var slice = unCompressedData.Slice(compressOffset, 4);
compressOffset += WireFormatting.ReadUInt32(ref slice, out var len);
Debug.Assert(len > 0);
- slice = unCompressedData.Slice(compressOffset, len);
- Debug.Assert(slice.Length >= len);
+ var sliceMsg = unCompressedData.Slice(compressOffset, len);
+ Debug.Assert(sliceMsg.Length == len);
compressOffset += (int)len;
// Here we use the Message.From(ref ReadOnlySequence seq ..) method to parse the message
@@ -275,7 +277,7 @@ Message MessageFromSequence(ref ReadOnlySequence unCompressedData, ref int
// Since the ParseChunk is async and we cannot use the ref SequenceReader reader
// See https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 for more details
- var message = Message.From(ref slice, len);
+ var message = Message.From(ref sliceMsg, len);
return message;
}
catch (Exception e)
@@ -461,15 +463,18 @@ private void ProcessChunks()
// need to wait the subscription is completed
// else the _subscriberId could be incorrect
_completeSubscription.Task.Wait();
+
try
{
while (!Token.IsCancellationRequested &&
await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
{
- while (_chunksBuffer.Reader.TryRead(out var chunk))
+ while (_chunksBuffer.Reader.TryRead(out var chunkWithAction))
{
// We send the credit to the server to allow the server to send more messages
// we request the credit before process the check to keep the network busy
+
+ var (chunk, action) = chunkWithAction;
try
{
if (Token.IsCancellationRequested)
@@ -498,8 +503,21 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
// and close the task
if (Token.IsCancellationRequested)
break;
-
- await ParseChunk(chunk).ConfigureAwait(false);
+ switch (action)
+ {
+ case ChunkAction.Skip:
+ // the chunk will be skipped due of CRC32 fail
+ Logger?.LogWarning(
+ "The chunk {ChunkId} will be skipped for {EntityInfo}",
+ chunk.ChunkId, DumpEntityConfiguration());
+ continue; // skip the chunk
+ case ChunkAction.TryToProcess:
+ // That's what happens most of the time, and this is the default action
+ await ParseChunk(chunk).ConfigureAwait(false);
+ break;
+ default:
+ throw new ArgumentOutOfRangeException();
+ }
}
}
@@ -593,6 +611,8 @@ private async Task Init()
return;
}
+ var chunkAction = ChunkAction.TryToProcess;
+
if (_config.Crc32 is not null)
{
var crcCalculated = BitConverter.ToUInt32(
@@ -606,13 +626,17 @@ private async Task Init()
DumpEntityConfiguration(),
chunkConsumed);
- throw new CrcException(
- $"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " +
- $"Chunk Consumed {chunkConsumed}");
+ // if the user has set the FailAction, we call it
+ // to allow the user to handle the chunk action
+ // if the FailAction is not set, we skip the chunk
+ chunkAction = _config.Crc32.FailAction?.Invoke(this) ?? ChunkAction.Skip;
}
}
- await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
+ // The chunkAction is passed to the _chunksBuffer because the ProcessChunks task
+ // asks for the credits in a Task. If we skip the chunk here no more credits will be requested
+ await _chunksBuffer.Writer.WriteAsync((deliver.Chunk, chunkAction), Token)
+ .ConfigureAwait(false);
}
catch (OperationCanceledException)
{
diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
index bd9950d1..b73095a4 100644
--- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
+++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
@@ -61,7 +61,11 @@ private async Task StandardConsumer(bool boot)
ConnectionClosedHandler = async (closeReason) =>
{
if (IsClosedNormally(closeReason))
+ {
+ UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
+ [_consumerConfig.Stream]);
return;
+ }
try
{
@@ -153,7 +157,12 @@ private async Task SuperConsumer(bool boot)
ConnectionClosedHandler = async (closeReason, partitionStream) =>
{
if (IsClosedNormally(closeReason))
+ {
+ UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
+ [partitionStream]);
return;
+ }
+
await RandomWait().ConfigureAwait(false);
try
{
diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
index 2d173326..0e3c7ae8 100644
--- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
+++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
@@ -50,7 +50,12 @@ private async Task SuperStreamProducer(bool boot)
{
await RandomWait().ConfigureAwait(false);
if (IsClosedNormally(closeReason))
+ {
+ UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
+ [partitionStream]);
return;
+ }
+
try
{
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
@@ -121,7 +126,11 @@ private async Task StandardProducer()
{
await RandomWait().ConfigureAwait(false);
if (IsClosedNormally())
+ {
+ UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
+ [_producerConfig.Stream]);
return;
+ }
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
diff --git a/RabbitMQ.Stream.Client/StreamCrc32.cs b/RabbitMQ.Stream.Client/StreamCrc32.cs
new file mode 100644
index 00000000..592890fc
--- /dev/null
+++ b/RabbitMQ.Stream.Client/StreamCrc32.cs
@@ -0,0 +1,23 @@
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+
+using System;
+
+namespace RabbitMQ.Stream.Client;
+
+///
+/// Default implementation of using the System.IO.Hashing.Crc32 class.
+/// The consumer uses this implementation by default to perform CRC32 checks on chunk received from the server.
+/// FailAction is set to null by default, meaning that no custom action is taken when the CRC32 check fails.
+/// It uses the default action of skipping the chunk.
+///
+public class StreamCrc32 : ICrc32
+{
+ public byte[] Hash(byte[] data)
+ {
+ return System.IO.Hashing.Crc32.Hash(data);
+ }
+
+ public Func FailAction { get; set; } = null;
+}
diff --git a/Tests/Crc32.cs b/Tests/Crc32.cs
deleted file mode 100644
index 7d7b3df3..00000000
--- a/Tests/Crc32.cs
+++ /dev/null
@@ -1,15 +0,0 @@
-// This source code is dual-licensed under the Apache License, version
-// 2.0, and the Mozilla Public License, version 2.0.
-// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
-
-using RabbitMQ.Stream.Client;
-
-namespace Tests;
-
-public class Crc32 : ICrc32
-{
- public byte[] Hash(byte[] data)
- {
- return System.IO.Hashing.Crc32.Hash(data);
- }
-}
diff --git a/Tests/Crc32Tests.cs b/Tests/Crc32Tests.cs
new file mode 100644
index 00000000..d2fa4575
--- /dev/null
+++ b/Tests/Crc32Tests.cs
@@ -0,0 +1,164 @@
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+
+using System;
+using System.Threading.Tasks;
+using RabbitMQ.Stream.Client;
+using RabbitMQ.Stream.Client.Reliable;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Tests;
+
+public class FirstCrc32IsWrong : ICrc32
+{
+ private int _callCount = 0;
+
+ public byte[] Hash(byte[] data)
+ {
+ _callCount++;
+ return _callCount == 1
+ ?
+ // Return a wrong hash on the first call to simulate a CRC32 failure
+ // second call will return the correct hash
+ [0x00, 0x00, 0x00, 0x04]
+ : System.IO.Hashing.Crc32.Hash(data);
+ }
+
+ public Func FailAction { get; set; }
+}
+
+public class Crc32Tests(ITestOutputHelper testOutputHelper)
+{
+ ///
+ /// Tests the Crc32 functionality of the consumer.
+ /// In this case, the first Crc32 is wrong, so the consumer should skip the chunk.
+ /// So the consumer should receive the second chunk.
+ ///
+ [Fact]
+ public async Task Crc32ShouldSkipChunk()
+ {
+ SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
+ var completionSource = new TaskCompletionSource();
+ var consumerConfig = new ConsumerConfig(system, stream)
+ {
+ Crc32 = new FirstCrc32IsWrong() { FailAction = (_) => ChunkAction.Skip },
+ InitialCredits = 1,
+ MessageHandler = (_, _, messageContext, _) =>
+ {
+ completionSource.TrySetResult(messageContext);
+ return Task.CompletedTask;
+ }
+ };
+ var consumer = await Consumer.Create(consumerConfig);
+ await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper);
+ await SystemUtils.WaitAsync(TimeSpan.FromMilliseconds(700));
+ await SystemUtils.PublishMessages(system, stream, 5, "2", testOutputHelper);
+ var messageContext = await completionSource.Task;
+ if (messageContext.ChunkId == 0)
+ {
+ Assert.Fail($"Expected the second chunk to be processed, but it was not.ChunkId {messageContext.ChunkId}");
+ }
+
+ Assert.True(consumer.IsOpen());
+ await consumer.Close();
+ await SystemUtils.CleanUpStreamSystem(system, stream);
+ }
+
+ [Fact]
+ public async Task Crc32ShouldSkipChunkWithTheDefaultBehavior()
+ {
+ SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
+ var completionSource = new TaskCompletionSource();
+ var consumerConfig = new ConsumerConfig(system, stream)
+ {
+ // FailAction is not set, so it will use the default behavior which is to skip the chunk
+ Crc32 = new FirstCrc32IsWrong() { },
+ InitialCredits = 1,
+ MessageHandler = (_, _, messageContext, _) =>
+ {
+ completionSource.TrySetResult(messageContext);
+ return Task.CompletedTask;
+ }
+ };
+ var consumer = await Consumer.Create(consumerConfig);
+ await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper);
+ await SystemUtils.WaitAsync(TimeSpan.FromMilliseconds(700));
+ await SystemUtils.PublishMessages(system, stream, 5, "2", testOutputHelper);
+ var messageContext = await completionSource.Task;
+ if (messageContext.ChunkId == 0)
+ {
+ Assert.Fail($"Expected the second chunk to be processed, but it was not.ChunkId {messageContext.ChunkId}");
+ }
+
+ Assert.True(consumer.IsOpen());
+ await consumer.Close();
+ await SystemUtils.CleanUpStreamSystem(system, stream);
+ }
+
+ ///
+ /// Tests the Crc32 functionality of the consumer.
+ /// Simulate a CRC32 failure on the first chunk,
+ /// and add a custom action to close the consumer.
+ /// the consumer should be closed and the chunk should be skipped.
+ ///
+ [Fact]
+ public async Task Crc32ShouldCloseTheConsumer()
+ {
+ SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
+ var consumerConfig = new ConsumerConfig(system, stream)
+ {
+ Crc32 = new FirstCrc32IsWrong()
+ {
+ FailAction = consumer =>
+ {
+ consumer.Close();
+ return ChunkAction.Skip; // Skip the chunk and close the consumer
+ }
+ },
+ MessageHandler = (_, _, _, _) => Task.CompletedTask
+ };
+
+ var consumer = await Consumer.Create(consumerConfig);
+ await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper);
+ await SystemUtils.WaitAsync(TimeSpan.FromMilliseconds(500));
+ await SystemUtils.WaitUntilAsync(() => !consumer.IsOpen());
+ await SystemUtils.CleanUpStreamSystem(system, stream);
+ }
+
+ ///
+ /// Here we test an edge case where Crc32 is wrong,
+ /// but the consumer should still process the chunk.
+ /// by given the FailAction as TryToProcess.
+ /// In real life when a CRC32 is wrong the consumer can't process the chunk,
+ /// this is to give more flexibility to the user.
+ ///
+ [Fact]
+ public async Task Crc32ShouldParseTheChunk()
+ {
+ SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
+ var completionSource = new TaskCompletionSource();
+ var consumerConfig = new ConsumerConfig(system, stream)
+ {
+ Crc32 = new FirstCrc32IsWrong() { FailAction = (_) => ChunkAction.TryToProcess },
+ MessageHandler = (_, _, messageContext, _) =>
+ {
+ completionSource.TrySetResult(messageContext);
+ return Task.CompletedTask;
+ }
+ };
+ var consumer = await Consumer.Create(consumerConfig);
+ await SystemUtils.PublishMessages(system, stream, 3, "1", testOutputHelper);
+ await SystemUtils.WaitAsync();
+ var messageContext = await completionSource.Task;
+ if (messageContext.ChunkId != 0)
+ {
+ Assert.Fail($"Expected the first chunk to be processed, but it was not.ChunkId {messageContext.ChunkId}");
+ }
+
+ Assert.True(consumer.IsOpen());
+ await consumer.Close();
+ await SystemUtils.CleanUpStreamSystem(system, stream);
+ }
+}
diff --git a/Tests/RawConsumerSystemTests.cs b/Tests/RawConsumerSystemTests.cs
index 7328ea6e..b208feec 100644
--- a/Tests/RawConsumerSystemTests.cs
+++ b/Tests/RawConsumerSystemTests.cs
@@ -20,7 +20,7 @@ namespace Tests
{
public class ConsumerSystemTests
{
- private readonly ICrc32 _crc32 = new Crc32();
+ private readonly ICrc32 _crc32 = new StreamCrc32();
private readonly ITestOutputHelper testOutputHelper;
public ConsumerSystemTests(ITestOutputHelper testOutputHelper)
diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs
index c44acd9c..1d4eace3 100644
--- a/Tests/ReliableTests.cs
+++ b/Tests/ReliableTests.cs
@@ -18,7 +18,7 @@ namespace Tests;
public class ReliableTests
{
- private readonly ICrc32 _crc32 = new Crc32();
+ private readonly ICrc32 _crc32 = new StreamCrc32() { };
private readonly ITestOutputHelper _testOutputHelper;
public ReliableTests(ITestOutputHelper testOutputHelper)
@@ -48,7 +48,8 @@ public async Task MessageWithoutConfirmationRaiseTimeout()
);
confirmationPipe.Start();
confirmationPipe.AddUnConfirmedMessage(1, new Message(Encoding.UTF8.GetBytes($"hello")));
- confirmationPipe.AddUnConfirmedMessage(2, new List() { new Message(Encoding.UTF8.GetBytes($"hello")) });
+ confirmationPipe.AddUnConfirmedMessage(2,
+ new List() { new Message(Encoding.UTF8.GetBytes($"hello")) });
new Utils(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask);
Assert.Equal(2, await confirmationTask.Task);
Assert.Equal(2, l.Count);
diff --git a/Tests/SuperStreamConsumerTests.cs b/Tests/SuperStreamConsumerTests.cs
index ac45fdbc..d1d1dc92 100644
--- a/Tests/SuperStreamConsumerTests.cs
+++ b/Tests/SuperStreamConsumerTests.cs
@@ -19,7 +19,6 @@ namespace Tests;
public class SuperStreamConsumerTests
{
- private readonly ICrc32 _crc32 = new Crc32();
private readonly ITestOutputHelper _testOutputHelper;
public SuperStreamConsumerTests(ITestOutputHelper testOutputHelper)
@@ -91,7 +90,6 @@ public async Task NumberOfMessagesConsumedShouldBeEqualsToPublished()
var consumer = await system.CreateSuperStreamConsumer(
new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange)
{
- Crc32 = _crc32,
ClientProvidedName = clientProvidedName,
Identifier = "super_stream_consumer_24680",
OffsetSpec =
diff --git a/Tests/Tests.csproj b/Tests/Tests.csproj
index 5db4744b..9342b265 100644
--- a/Tests/Tests.csproj
+++ b/Tests/Tests.csproj
@@ -6,8 +6,8 @@
Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
Broadcom
Broadcom Inc. and/or its subsidiaries.
- net8.0;net9.0
false
+ net8.0
@@ -15,7 +15,6 @@
-
diff --git a/docs/Documentation/ConsumerUsage.cs b/docs/Documentation/ConsumerUsage.cs
index fcc9abc1..913da05c 100644
--- a/docs/Documentation/ConsumerUsage.cs
+++ b/docs/Documentation/ConsumerUsage.cs
@@ -128,15 +128,7 @@ public static async Task CreateConsumerSingleUpdateListener()
await streamSystem.Close().ConfigureAwait(false);
}
- // tag::consumer-creation-crc[]
- private class UserCrc32 : ICrc32 // <1>
- {
- public byte[] Hash(byte[] data)
- {
- // Here we use the System.IO.Hashing.Crc32 implementation
- return System.IO.Hashing.Crc32.Hash(data);
- }
- }
+
public static async Task CreateConsumerWithCrc()
{
@@ -148,7 +140,10 @@ public static async Task CreateConsumerWithCrc()
streamSystem,
"my-stream")
{
- Crc32 = new UserCrc32(), // <2>
+ Crc32 = new StreamCrc32() // <1>
+ {
+ FailAction = (consumerInstance) => ChunkAction.Skip // <2>
+ },
OffsetSpec = new OffsetTypeTimestamp(),
// end::consumer-creation-crc[]
MessageHandler = async (stream, consumer, context, message) => // <4>
diff --git a/docs/Documentation/Documentation.csproj b/docs/Documentation/Documentation.csproj
index 2372da43..f2b8384c 100644
--- a/docs/Documentation/Documentation.csproj
+++ b/docs/Documentation/Documentation.csproj
@@ -20,7 +20,6 @@
-
diff --git a/docs/Documentation/GettingStarted.cs b/docs/Documentation/GettingStarted.cs
index 02c3fd6c..3c92ba76 100644
--- a/docs/Documentation/GettingStarted.cs
+++ b/docs/Documentation/GettingStarted.cs
@@ -121,11 +121,11 @@ await producer.Send( // <6>
}
- confirmationTaskCompletionSource.Task.Wait(); // <7>
+ await confirmationTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); // <7>
await producer.Close().ConfigureAwait(false); // <8>
// end::sample-producer[]
- var consumerTaskCompletionSource = new TaskCompletionSource();
+ var consumerTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var consumerCount = 0;
// Create a consumer
// tag::sample-consumer[]
@@ -149,7 +149,7 @@ await producer.Send( // <6>
consumerLogger // <4>
)
.ConfigureAwait(false);
- consumerTaskCompletionSource.Task.Wait(); // <5>
+ await consumerTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); // <5>
await consumer.Close().ConfigureAwait(false); // <6>
// end::sample-consumer[]
diff --git a/docs/ReliableClient/ReliableClient.csproj b/docs/ReliableClient/ReliableClient.csproj
index 27b2351e..da4b0615 100644
--- a/docs/ReliableClient/ReliableClient.csproj
+++ b/docs/ReliableClient/ReliableClient.csproj
@@ -7,16 +7,17 @@
net8.0;net9.0
-
-
-
-
+
+
-
+
+
+
+
Always
diff --git a/docs/asciidoc/api.adoc b/docs/asciidoc/api.adoc
index ddf6d7f4..dd4e3b53 100644
--- a/docs/asciidoc/api.adoc
+++ b/docs/asciidoc/api.adoc
@@ -5,7 +5,7 @@
==== Overview
This section describes the API to connect to the RabbitMQ Stream Plugin, publish messages, and consume messages.
-There are 3 main interfaces:
+There are three main interfaces:
* `RabbitMQ.Stream.Client` for connecting to a node and optionally managing streams.
* `RabbitMQ.Stream.Client.Reliable.Producer` to publish messages.
@@ -141,8 +141,9 @@ The following table sums up the main settings to create an `StreamSystem` using
[[connection-pool]]
===== Connection pool
+
Introduced on version 1.8.0.
-With the connection pool you can define how many producers and consumers can be created on a single connection and the `ConnectionCloseConfig`
+With the connection pool you can define how many producers and consumers can be created on a single connection and the `ConnectionCloseConfig`
[source]
----
@@ -164,7 +165,8 @@ A high value can reduce the number of connections to the server but it could red
A low value can increase the number of connections to the server but it could increase the performance of the producer and the consumer.
-The consumers share the same handler, so if you have a high number of consumers per connection, the handler could be a bottleneck. It means that if there is a slow consumer all the other consumers could be slow.
+The consumers share the same handler, so if you have a high number of consumers per connection, the handler could be a bottleneck.
+It means that if there is a slow consumer all the other consumers could be slow.
TIP:
You can use different StreamSystemConfig like:
@@ -186,6 +188,7 @@ streamSystemToIncreaseThePerformances = new StreamSystemConfig{
}
}
----
+
There is not a magic number, you have to test and evaluate the best value for your use case.
The `ConnectionCloseConfig` defines the policy to close the connection when the last producer or consumer is closed.
@@ -194,7 +197,6 @@ The `ConnectionCloseConfig` defines the policy to close the connection when the
The policy `CloseWhenEmpty` covers the standard use cases when the producers or consumers have long life running.
-
The policy `CloseWhenEmptyAndIdle` is useful when producers or consumers have short live and the pool has to be fast to create a new entity.
The parameter `IdleTime` defines the time to wait before closing the connection when the last producer or consumer is closed.
The parameter `CheckIdleTime` defines the time to check if the connection is idle.
@@ -211,7 +213,6 @@ The parameter `CheckIdleTime` defines the time to check if the connection is idl
Note: You can't close the stream systems if there are producers or consumers still running with the `CloseWhenEmptyAndIdle` policy.
-
[[entity-status]]
[[address-resolver]]
===== When a Load Balancer is in Use
@@ -396,7 +397,7 @@ This value is valid only for the `Send(Message)` method.
|`StatusChanged`
|The callback invoked when the producer status changes. See <> for more details.
-|`null`
+|`null`
|===
@@ -815,7 +816,7 @@ The following table sums up the main settings to create a `Consumer` with `Consu
|`ICrc32`
|The <> to use to validate the chunk server crc32 .
-|`null` (no CRC32)
+| `StreamCrc32()`
|`StatusChanged`
|The callback invoked when the consumer status changes. See <> for more details.
@@ -836,8 +837,8 @@ See the <> to find out more about the diffe
===== Check the CRC on Delivery
RabbitMQ Stream provides a CRC32 checksum on each chunk.
-The client library can check the checksum before parse the chunk and throw an `CrcException` exception if the validation fails.
-By default the CRC32 checksum is not enabled, to enable it you need to set the `ICrc32` interface in the `ConsumerConfig`:
+The client library can check the checksum before parsing the chunk.
+By default the CRC32 checksum enabled (with the class `StreamCrc32()` ), to disable it you need to set null to `ICrc32` interface in the `ConsumerConfig`:
.Checking the CRC32 checksum on the chunk
[source,c#,indent=0]
@@ -847,13 +848,30 @@ include::{test-examples}/ConsumerUsage.cs[tag=consumer-creation-crc]
<1> An implementation of the `ICrc32` interface.
You are free to use any implementation you want.
-The client is tested with `System.IO.Hashing`.
-`System.IO.Hashing` is not shipped with the client library.
-<2> Set the `ICrc32` implementation in the `ConsumerConfig`
+By default the client library uses `StreamCrc32` which is a simple implementation of the `ICrc32` interface.
+<2> The function FailAction is called when the CRC32 checksum is not valid.
+
+`FailAction` return types:
+
+[%header,cols=2*]
+|===
+|Parameter Name
+|Description
-It is recommended to use it.
-It _could_ reduce the performance of the consumer.
-It depends on the use case.
+|`ChunkAction.Skip`
+|Skip the chunk and continue consuming the next chunk.
+
+|`ChunkAction.TryToProcess`
+|Try to process the chunk even if the CRC32 checksum is not valid.
+|===
+
+If `FailAction` is `null` the library will use `ChunkAction.Skip` as default value.
+
+`FailAction` has `sourceConsumer` as parameter, which is the consumer that failed to validate the CRC32 checksum.
+the code in the `FailAction` should be short, fast and safe, as it is executed in the consumer thread.
+
+It is recommended to leave it enabled, as it allows the client library to detect corrupted chunks and avoid parsing them.
+Disabling the CRC32 _could_ improve performance, but it is not recommended as it can lead to parsing corrupted chunks and unexpected behavior.
[[consumer-reconnect-strategy]]
[[specifying-an-offset]]
@@ -940,7 +958,6 @@ include::{test-examples}/ConsumerUsage.cs[tag=manual-tracking-defaults]
The snippet above uses `consumer.StoreOffset(context.Offset)` to store at the offset of the current message.
It is possible to store the offset in a more generic way with `StreamSystem.StoreOffset(reference,stream, offsetValue)`
-
====== Considerations On Offset Tracking
_When to store offsets?_ Avoid storing offsets too often or, worse, for each message.
@@ -1073,13 +1090,14 @@ include::{test-examples}/ConsumerUsage.cs[tag=sac-consumer-update-listener]
<2> Enable single active consumer
<3> Handle `ConsumerUpdateListener` callback
-
[[entity-status]]
==== Producer/Consumer change status callback
+
`Producer` and `Consumer` classes provide a callback to handle the status change.
It is possible to configure the event using the configuration `StatusChanged` property.
like the following snippet:
+
[source,c#,indent=0]
--------
var conf = new ConsumerConfig(system, stream)
@@ -1148,7 +1166,6 @@ the `statusInfo` contains the following information:
A full example of the status change callback can be found in https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/ReliableClient[here].
-
[[low-high-level-classes]]
==== Low Level and High Level classes
diff --git a/global.json b/global.json
index 47a7fa60..af8d7f46 100644
--- a/global.json
+++ b/global.json
@@ -1,5 +1,6 @@
{
"sdk": {
- "allowPrerelease": false
+ "allowPrerelease": false,
+ "version": "9.0.202"
}
}