From 8974cde535161a86cd827ef4bd06b63a2bce954e Mon Sep 17 00:00:00 2001 From: Falk Jonas - HK Date: Fri, 21 Mar 2025 10:10:46 +0100 Subject: [PATCH 1/3] Currently, the last consumed offset is stored before the message handler is called. This change does it after the messagehandler is called in order to avoid potential messageloss on the event of connection-issues/reconnecting-consumer --- RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index 5b32043a..a681bf2d 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -76,12 +76,13 @@ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream, MessageHandler = async (consumer, ctx, message) => { _consumedFirstTime = true; - _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; if (_consumerConfig.MessageHandler != null) { await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message) .ConfigureAwait(false); } + _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; + }, }, BaseLogger).ConfigureAwait(false); } From 3ad1254c17778b1e0327dbff369c88e4df0f1efa Mon Sep 17 00:00:00 2001 From: Falk Jonas - HK Date: Wed, 26 Mar 2025 07:19:50 +0100 Subject: [PATCH 2/3] Aligned the fix for consumers with superstream consumers --- RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index a681bf2d..85328c1a 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -75,12 +75,12 @@ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream, }, MessageHandler = async (consumer, ctx, message) => { - _consumedFirstTime = true; if (_consumerConfig.MessageHandler != null) { await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message) .ConfigureAwait(false); } + _consumedFirstTime = true; _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; }, @@ -147,13 +147,14 @@ await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r, }, MessageHandler = async (partitionStream, consumer, ctx, message) => { - _consumedFirstTime = true; - _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; if (_consumerConfig.MessageHandler != null) { await _consumerConfig.MessageHandler(partitionStream, consumer, ctx, message).ConfigureAwait(false); } + _consumedFirstTime = true; + _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; + }, }, BaseLogger).ConfigureAwait(false); } From b6d065038648645716b2c6eaae3735bbea4b43dd Mon Sep 17 00:00:00 2001 From: Falk Jonas - HK Date: Wed, 26 Mar 2025 08:17:11 +0100 Subject: [PATCH 3/3] code format --- RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index 85328c1a..e0f07332 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -80,9 +80,9 @@ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream, await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message) .ConfigureAwait(false); } + _consumedFirstTime = true; _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; - }, }, BaseLogger).ConfigureAwait(false); } @@ -152,9 +152,9 @@ await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r, await _consumerConfig.MessageHandler(partitionStream, consumer, ctx, message).ConfigureAwait(false); } + _consumedFirstTime = true; _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; - }, }, BaseLogger).ConfigureAwait(false); }