diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index 5b32043a..e0f07332 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -75,13 +75,14 @@ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream, }, MessageHandler = async (consumer, ctx, message) => { - _consumedFirstTime = true; - _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; if (_consumerConfig.MessageHandler != null) { await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message) .ConfigureAwait(false); } + + _consumedFirstTime = true; + _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; }, }, BaseLogger).ConfigureAwait(false); } @@ -146,13 +147,14 @@ await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r, }, MessageHandler = async (partitionStream, consumer, ctx, message) => { - _consumedFirstTime = true; - _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; if (_consumerConfig.MessageHandler != null) { await _consumerConfig.MessageHandler(partitionStream, consumer, ctx, message).ConfigureAwait(false); } + + _consumedFirstTime = true; + _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; }, }, BaseLogger).ConfigureAwait(false); }