Skip to content

Improve parse chunk consumer side #423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ jobs:
name: build/test on windows-latest
runs-on: windows-latest
steps:
- name: Clone repository
uses: actions/checkout@v4
- name: Setup .NET SDK
uses: actions/setup-dotnet@v4
with:
global-json-file: global.json
- uses: actions/checkout@v4
- uses: actions/cache@v4
with:
Expand Down Expand Up @@ -35,6 +41,12 @@ jobs:
name: build/test on ubuntu-latest
runs-on: ubuntu-latest
steps:
- name: Clone repository
uses: actions/checkout@v4
- name: Setup .NET SDK
uses: actions/setup-dotnet@v4
with:
global-json-file: global.json
- uses: actions/checkout@v4
- uses: actions/setup-dotnet@v4
with:
Expand Down
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<PackageVersion Include="K4os.Compression.LZ4.Streams" Version="1.2.16" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Json" Version="9.0.6" />
</ItemGroup>
<ItemGroup Label="net8.0 specific" Condition="'$(TargetFramework)' == 'net8.0'">
<!-- RabbitMQ.Stream.Client -->
Expand Down
8 changes: 0 additions & 8 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,6 @@ public UnknownCommandException(string s)
}
}

public class CrcException : Exception
{
public CrcException(string s)
: base(s)
{
}
}

public class TooManyConnectionsException : Exception
{
public TooManyConnectionsException(string s)
Expand Down
10 changes: 5 additions & 5 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,9 @@ public ushort InitialCredits
}
}

// enables the check of the crc on the delivery.
// the server will send the crc for each chunk and the client will check it.
// It is not enabled by default because it is could reduce the performance.
public ICrc32 Crc32 { get; set; } = null;
// It is enabled by default. You can disable it by setting it to null.
// It is recommended to keep it enabled. Disable it only for performance reasons.
public ICrc32 Crc32 { get; set; } = new StreamCrc32();
}

public class ConsumerInfo : Info
Expand All @@ -90,6 +89,7 @@ public ConsumerInfo(string stream, string reference, string identifier, List<str
public override string ToString()
{
var partitions = Partitions ?? [];
return $"ConsumerInfo(Stream={Stream}, Reference={Reference}, Identifier={Identifier}, Partitions={string.Join(",", partitions)})";
return
$"ConsumerInfo(Stream={Stream}, Reference={Reference}, Identifier={Identifier}, Partitions={string.Join(",", partitions)})";
}
}
24 changes: 24 additions & 0 deletions RabbitMQ.Stream.Client/ICrc32.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,24 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;

namespace RabbitMQ.Stream.Client
{
public enum ChunkAction
{
/// <summary>
/// The consumer will try to process the Chunk.
/// </summary>
TryToProcess,

/// <summary>
/// The consumer will skip the Chunk and continue processing the next Chunk.
/// All the messages in the Chunk will be skipped.
/// </summary>
Skip
}

/// <summary>
/// ICrc32 defines an interface for implementing crc32 hashing.
/// Library users who wish to perform crc32 checks on data from RabbitMQ
Expand All @@ -13,5 +29,13 @@ namespace RabbitMQ.Stream.Client
public interface ICrc32
{
byte[] Hash(byte[] data);

/// <summary>
/// FailAction is called when the Crc32 check fails.
/// The user can assign a function that returns a <see cref="ChunkAction"/>.
/// It is possible to add custom logic to handle the failure, such as logging.
/// The code here should be safe
/// </summary>
Func<IConsumer, ChunkAction> FailAction { get; set; }
}
}
12 changes: 10 additions & 2 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ RabbitMQ.Stream.Client.BindingsSuperStreamSpec.BindingsSuperStreamSpec(string Na
RabbitMQ.Stream.Client.Chunk.Crc.get -> uint
RabbitMQ.Stream.Client.Chunk.Data.get -> System.ReadOnlyMemory<byte>
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
RabbitMQ.Stream.Client.ChunkAction
RabbitMQ.Stream.Client.ChunkAction.Skip = 1 -> RabbitMQ.Stream.Client.ChunkAction
RabbitMQ.Stream.Client.ChunkAction.TryToProcess = 0 -> RabbitMQ.Stream.Client.ChunkAction
RabbitMQ.Stream.Client.Client.ClientId.get -> string
RabbitMQ.Stream.Client.Client.ClientId.init -> void
RabbitMQ.Stream.Client.Client.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
Expand Down Expand Up @@ -109,8 +112,6 @@ RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string identifier, System.Collections.Generic.List<string> partitions) -> void
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
RabbitMQ.Stream.Client.CrcException
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
RabbitMQ.Stream.Client.CreateConsumerException.CreateConsumerException(string s, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void
RabbitMQ.Stream.Client.CreateException
RabbitMQ.Stream.Client.CreateException.CreateException(string s) -> void
Expand Down Expand Up @@ -175,6 +176,8 @@ RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
RabbitMQ.Stream.Client.ICrc32.FailAction.set -> void
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Info.Identifier.get -> string
Expand Down Expand Up @@ -322,6 +325,11 @@ RabbitMQ.Stream.Client.RouteQueryResponse.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.StreamCrc32
RabbitMQ.Stream.Client.StreamCrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
RabbitMQ.Stream.Client.StreamCrc32.FailAction.set -> void
RabbitMQ.Stream.Client.StreamCrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.StreamCrc32.StreamCrc32() -> void
RabbitMQ.Stream.Client.StreamStats
RabbitMQ.Stream.Client.StreamStats.CommittedChunkId() -> ulong
RabbitMQ.Stream.Client.StreamStats.FirstOffset() -> ulong
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<PackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" PrivateAssets="all" />
<PackageReference Include="Microsoft.SourceLink.GitHub" PrivateAssets="all" />
<PackageReference Include="MinVer" PrivateAssets="all" />
<PackageReference Include="System.IO.Hashing" />
<PackageReference Include="System.IO.Pipelines" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
</ItemGroup>
Expand Down
52 changes: 38 additions & 14 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,16 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable
{
private readonly RawConsumerConfig _config;

private readonly Channel<Chunk> _chunksBuffer;
private readonly Channel<(Chunk, ChunkAction)> _chunksBuffer;

private readonly ushort _initialCredits;

// _completeSubscription is used to notify the ProcessChunks task
// that the subscription is completed and so it can start to process the chunks
// this is needed because the socket starts to receive the chunks before the subscription_id is
// assigned.
private readonly TaskCompletionSource _completeSubscription = new();
private readonly TaskCompletionSource _completeSubscription =
new(TaskCreationOptions.RunContinuationsAsynchronously);

protected sealed override string DumpEntityConfiguration()
{
Expand All @@ -161,7 +163,8 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration());
Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier, null);
// _chunksBuffer is a channel that is used to buffer the chunks
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)

_chunksBuffer = Channel.CreateBounded<(Chunk, ChunkAction)>(new BoundedChannelOptions(_initialCredits)
{
AllowSynchronousContinuations = false,
SingleReader = true,
Expand Down Expand Up @@ -220,7 +223,6 @@ public async Task StoreOffset(ulong offset)
/// MaybeLockDispatch is an optimization to avoid to lock the dispatch
/// when the consumer is not single active consumer
/// </summary>

private async Task MaybeLockDispatch()
{
if (_config.IsSingleActiveConsumer)
Expand Down Expand Up @@ -266,16 +268,16 @@ Message MessageFromSequence(ref ReadOnlySequence<byte> unCompressedData, ref int
var slice = unCompressedData.Slice(compressOffset, 4);
compressOffset += WireFormatting.ReadUInt32(ref slice, out var len);
Debug.Assert(len > 0);
slice = unCompressedData.Slice(compressOffset, len);
Debug.Assert(slice.Length >= len);
var sliceMsg = unCompressedData.Slice(compressOffset, len);
Debug.Assert(sliceMsg.Length == len);
compressOffset += (int)len;

// Here we use the Message.From(ref ReadOnlySequence<byte> seq ..) method to parse the message
// instead of the Message From(ref SequenceReader<byte> reader ..) method
// Since the ParseChunk is async and we cannot use the ref SequenceReader<byte> reader
// See https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 for more details

var message = Message.From(ref slice, len);
var message = Message.From(ref sliceMsg, len);
return message;
}
catch (Exception e)
Expand Down Expand Up @@ -461,15 +463,18 @@ private void ProcessChunks()
// need to wait the subscription is completed
// else the _subscriberId could be incorrect
_completeSubscription.Task.Wait();

try
{
while (!Token.IsCancellationRequested &&
await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
{
while (_chunksBuffer.Reader.TryRead(out var chunk))
while (_chunksBuffer.Reader.TryRead(out var chunkWithAction))
{
// We send the credit to the server to allow the server to send more messages
// we request the credit before process the check to keep the network busy

var (chunk, action) = chunkWithAction;
try
{
if (Token.IsCancellationRequested)
Expand Down Expand Up @@ -498,8 +503,21 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
// and close the task
if (Token.IsCancellationRequested)
break;

await ParseChunk(chunk).ConfigureAwait(false);
switch (action)
{
case ChunkAction.Skip:
// the chunk will be skipped due of CRC32 fail
Logger?.LogWarning(
"The chunk {ChunkId} will be skipped for {EntityInfo}",
chunk.ChunkId, DumpEntityConfiguration());
continue; // skip the chunk
case ChunkAction.TryToProcess:
// That's what happens most of the time, and this is the default action
await ParseChunk(chunk).ConfigureAwait(false);
break;
default:
throw new ArgumentOutOfRangeException();
}
}
}

Expand Down Expand Up @@ -593,6 +611,8 @@ private async Task Init()
return;
}

var chunkAction = ChunkAction.TryToProcess;

if (_config.Crc32 is not null)
{
var crcCalculated = BitConverter.ToUInt32(
Expand All @@ -606,13 +626,17 @@ private async Task Init()
DumpEntityConfiguration(),
chunkConsumed);

throw new CrcException(
$"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " +
$"Chunk Consumed {chunkConsumed}");
// if the user has set the FailAction, we call it
// to allow the user to handle the chunk action
// if the FailAction is not set, we skip the chunk
chunkAction = _config.Crc32.FailAction?.Invoke(this) ?? ChunkAction.Skip;
}
}

await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
// The chunkAction is passed to the _chunksBuffer because the ProcessChunks task
// asks for the credits in a Task. If we skip the chunk here no more credits will be requested
await _chunksBuffer.Writer.WriteAsync((deliver.Chunk, chunkAction), Token)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
Expand Down
9 changes: 9 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ private async Task<IConsumer> StandardConsumer(bool boot)
ConnectionClosedHandler = async (closeReason) =>
{
if (IsClosedNormally(closeReason))
{
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
[_consumerConfig.Stream]);
return;
}

try
{
Expand Down Expand Up @@ -153,7 +157,12 @@ private async Task<IConsumer> SuperConsumer(bool boot)
ConnectionClosedHandler = async (closeReason, partitionStream) =>
{
if (IsClosedNormally(closeReason))
{
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
[partitionStream]);
return;
}

await RandomWait().ConfigureAwait(false);
try
{
Expand Down
9 changes: 9 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
{
await RandomWait().ConfigureAwait(false);
if (IsClosedNormally(closeReason))
{
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
[partitionStream]);
return;
}

try
{
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
Expand Down Expand Up @@ -121,7 +126,11 @@ private async Task<IProducer> StandardProducer()
{
await RandomWait().ConfigureAwait(false);
if (IsClosedNormally())
{
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
[_producerConfig.Stream]);
return;
}

await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
Expand Down
23 changes: 23 additions & 0 deletions RabbitMQ.Stream.Client/StreamCrc32.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;

namespace RabbitMQ.Stream.Client;

/// <summary>
/// Default implementation of <see cref="ICrc32"/> using the System.IO.Hashing.Crc32 class.
/// The consumer uses this implementation by default to perform CRC32 checks on chunk received from the server.
/// FailAction is set to null by default, meaning that no custom action is taken when the CRC32 check fails.
/// It uses the default action of skipping the chunk.
/// </summary>
public class StreamCrc32 : ICrc32
{
public byte[] Hash(byte[] data)
{
return System.IO.Hashing.Crc32.Hash(data);
}

public Func<IConsumer, ChunkAction> FailAction { get; set; } = null;
}
15 changes: 0 additions & 15 deletions Tests/Crc32.cs

This file was deleted.

Loading