diff --git a/pkg/stream/client.go b/pkg/stream/client.go index e8ba9382..08a845aa 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -891,7 +891,9 @@ func (c *Client) DeclareSubscriber(streamName string, // copy the option offset to the consumer offset // the option.offset won't change ( in case we need to retrive the original configuration) // consumer.current offset will be moved when reading - consumer.setCurrentOffset(options.Offset.offset) + if !options.IsSingleActiveConsumerEnabled() { + consumer.setCurrentOffset(options.Offset.offset) + } /// define the consumerOptions consumerProperties := make(map[string]string) diff --git a/pkg/stream/consumer_sac_test.go b/pkg/stream/consumer_sac_test.go index d1bc2dda..23422e2c 100644 --- a/pkg/stream/consumer_sac_test.go +++ b/pkg/stream/consumer_sac_test.go @@ -187,4 +187,64 @@ var _ = Describe("Streaming Single Active Consumer", func() { Expect(c2.Close()).NotTo(HaveOccurred()) }) + It("offset should not be overwritten by autocommit on consumer close when no messages have been consumed", func() { + producer, err := testEnvironment.NewProducer(streamName, nil) + Expect(err).NotTo(HaveOccurred()) + + consumerUpdate := func(streamName string, isActive bool) OffsetSpecification { + offset, err := testEnvironment.QueryOffset("my_consumer", streamName) + if err != nil { + return OffsetSpecification{}.First() + } + + return OffsetSpecification{}.Offset(offset + 1) + } + + var messagesReceived int32 = 0 + consumerA, err := testEnvironment.NewConsumer(streamName, + func(consumerContext ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&messagesReceived, 1) + }, NewConsumerOptions(). + SetSingleActiveConsumer(NewSingleActiveConsumer(consumerUpdate)). + SetConsumerName("my_consumer"). + SetAutoCommit(nil)) + Expect(err).NotTo(HaveOccurred()) + + Expect(producer.BatchSend(CreateArrayMessagesForTesting(10))).NotTo(HaveOccurred()) + Eventually(func() int32 { + return atomic.LoadInt32(&messagesReceived) + }, 5*time.Second).Should(Equal(int32(10)), + "consumer should receive only 10 messages") + + Expect(consumerA.Close()).NotTo(HaveOccurred()) + Expect(consumerA.GetLastStoredOffset()).To(Equal(int64(9))) + + offset, err := testEnvironment.QueryOffset("my_consumer", streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(offset).To(Equal(int64(9))) + + messagesReceived = 0 + consumerB, err := testEnvironment.NewConsumer(streamName, + func(consumerContext ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&messagesReceived, 1) + }, NewConsumerOptions(). + SetConsumerName("my_consumer"). + SetSingleActiveConsumer(NewSingleActiveConsumer(consumerUpdate)). + SetAutoCommit(nil)) + Expect(err).NotTo(HaveOccurred()) + + Expect(consumerB.Close()).NotTo(HaveOccurred()) + time.Sleep(100 * time.Millisecond) + Eventually(func() int32 { + return atomic.LoadInt32(&messagesReceived) + }, 5*time.Second).Should(Equal(int32(0)), + "consumer should have received no messages") + + offsetAfter, err := testEnvironment.QueryOffset("my_consumer", streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(offsetAfter).To(Equal(int64(9))) + + Expect(producer.Close()).NotTo(HaveOccurred()) + }) + }) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 97647763..9ab68fd7 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -202,6 +202,7 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, status: open, mutex: &sync.Mutex{}, MessagesHandler: messagesHandler, + currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled lastStoredOffset: -1, // because 0 is a valid value for the offset isPromotedAsActive: true, } diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 67a76c35..96f0d8e8 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -603,6 +603,11 @@ func (c *Client) handleConsumerUpdate(readProtocol *ReaderProtocol, r *bufio.Rea responseOff := consumer.options.SingleActiveConsumer.ConsumerUpdate(consumer.GetStreamName(), isActive == 1) consumer.options.SingleActiveConsumer.offsetSpecification = responseOff + + if isActive == 1 { + consumer.setCurrentOffset(responseOff.offset) + } + err = consumer.writeConsumeUpdateOffsetToSocket(readProtocol.CorrelationId, responseOff) logErrorCommand(err, "handleConsumerUpdate writeConsumeUpdateOffsetToSocket") }