From fe793815d3a6c7d9646c6ab226dbbfae2bcd9e5b Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 16:22:10 +0200 Subject: [PATCH 01/14] fix consumer offset being reset to 0 when closing consumer when no messages have been consumed --- pkg/stream/coordinator.go | 1 + pkg/stream/server_frame.go | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 97647763..9ab68fd7 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -202,6 +202,7 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, status: open, mutex: &sync.Mutex{}, MessagesHandler: messagesHandler, + currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled lastStoredOffset: -1, // because 0 is a valid value for the offset isPromotedAsActive: true, } diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 67a76c35..4b79ea19 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -600,9 +600,16 @@ func (c *Client) handleConsumerUpdate(readProtocol *ReaderProtocol, r *bufio.Rea return } consumer.setPromotedAsActive(isActive == 1) + + // shouldn't this only be run if the consumer is active? responseOff := consumer.options.SingleActiveConsumer.ConsumerUpdate(consumer.GetStreamName(), isActive == 1) consumer.options.SingleActiveConsumer.offsetSpecification = responseOff + + if isActive == 1 { + consumer.currentOffset = responseOff.offset + } + err = consumer.writeConsumeUpdateOffsetToSocket(readProtocol.CorrelationId, responseOff) logErrorCommand(err, "handleConsumerUpdate writeConsumeUpdateOffsetToSocket") } From a9d7bbce0aeba2f2aabc6c2f0e2a06bc28293a34 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 17:03:05 +0200 Subject: [PATCH 02/14] add test --- pkg/stream/consumer_test.go | 50 +++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index d0a95c5b..08448677 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -282,6 +282,56 @@ var _ = Describe("Streaming Consumers", func() { }) + It("offset should not be overwritten by autocommit on consumer close when no messages have been consumed", func() { + producer, err := env.NewProducer(streamName, nil) + Expect(err).NotTo(HaveOccurred()) + + var messagesReceived int32 = 0 + consumer, err := env.NewConsumer(streamName, + func(consumerContext ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&messagesReceived, 1) + }, NewConsumerOptions(). + SetOffset(OffsetSpecification{}.First()). + SetConsumerName("my_consumer"). + SetAutoCommit(nil)) + Expect(err).NotTo(HaveOccurred()) + + Expect(producer.BatchSend(CreateArrayMessagesForTesting(10))).NotTo(HaveOccurred()) + Eventually(func() int32 { + return atomic.LoadInt32(&messagesReceived) + }, 5*time.Second).Should(Equal(int32(10)), + "consumer should receive only 10 messages") + + Expect(consumer.Close()).NotTo(HaveOccurred()) + Expect(consumer.GetLastStoredOffset()).To(Equal(int64(9))) + + offset, err := env.QueryOffset("my_consumer", streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(offset).To(Equal(int64(9))) + + messagesReceived = 0 + consumer, err = env.NewConsumer(streamName, + func(consumerContext ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&messagesReceived, 1) + }, NewConsumerOptions(). + SetOffset(OffsetSpecification{}.Offset(offset)). + SetConsumerName("my_consumer"). + SetAutoCommit(nil)) + Expect(err).NotTo(HaveOccurred()) + + Expect(consumer.Close()).NotTo(HaveOccurred()) + Eventually(func() int32 { + return atomic.LoadInt32(&messagesReceived) + }, 5*time.Second).Should(Equal(int32(0)), + "consumer should have received no messages") + + offset, err = env.QueryOffset("my_consumer", streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(offset).To(Equal(int64(9))) + + Expect(producer.Close()).NotTo(HaveOccurred()) + }) + It("Deduplication", func() { producerName := "producer-ded" producer, err := env.NewProducer(streamName, NewProducerOptions().SetProducerName(producerName)) From 3b4e4708d935eb6b73075932b6494dd48b6eae0e Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 17:17:03 +0200 Subject: [PATCH 03/14] test that it fails --- pkg/stream/coordinator.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 9ab68fd7..7e3d57c4 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -198,11 +198,11 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, defer coordinator.mutex.Unlock() var lastId, _ = coordinator.getNextConsumerItem() var item = &Consumer{ID: lastId, options: parameters, - response: newResponse(lookUpCommand(commandSubscribe)), - status: open, - mutex: &sync.Mutex{}, - MessagesHandler: messagesHandler, - currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled + response: newResponse(lookUpCommand(commandSubscribe)), + status: open, + mutex: &sync.Mutex{}, + MessagesHandler: messagesHandler, + //currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled lastStoredOffset: -1, // because 0 is a valid value for the offset isPromotedAsActive: true, } From 6dc6b956d5e924bb630915ece76736794b23c191 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 17:31:10 +0200 Subject: [PATCH 04/14] test that it fails --- pkg/stream/consumer_sac_test.go | 60 +++++++++++++++++++++++++++++++++ pkg/stream/consumer_test.go | 50 --------------------------- pkg/stream/server_frame.go | 6 ++-- 3 files changed, 63 insertions(+), 53 deletions(-) diff --git a/pkg/stream/consumer_sac_test.go b/pkg/stream/consumer_sac_test.go index d1bc2dda..b5889d50 100644 --- a/pkg/stream/consumer_sac_test.go +++ b/pkg/stream/consumer_sac_test.go @@ -187,4 +187,64 @@ var _ = Describe("Streaming Single Active Consumer", func() { Expect(c2.Close()).NotTo(HaveOccurred()) }) + It("offset should not be overwritten by autocommit on consumer close when no messages have been consumed", func() { + producer, err := testEnvironment.NewProducer(streamName, nil) + Expect(err).NotTo(HaveOccurred()) + + var messagesReceived int32 = 0 + consumer, err := testEnvironment.NewConsumer(streamName, + func(consumerContext ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&messagesReceived, 1) + }, NewConsumerOptions(). + SetOffset(OffsetSpecification{}.First()). + SetConsumerName("my_consumer"). + SetAutoCommit(nil)) + Expect(err).NotTo(HaveOccurred()) + + Expect(producer.BatchSend(CreateArrayMessagesForTesting(10))).NotTo(HaveOccurred()) + Eventually(func() int32 { + return atomic.LoadInt32(&messagesReceived) + }, 5*time.Second).Should(Equal(int32(10)), + "consumer should receive only 10 messages") + + Expect(consumer.Close()).NotTo(HaveOccurred()) + Expect(consumer.GetLastStoredOffset()).To(Equal(int64(9))) + + offset, err := testEnvironment.QueryOffset("my_consumer", streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(offset).To(Equal(int64(9))) + + consumerUpdate := func(streamName string, isActive bool) OffsetSpecification { + offset, err := testEnvironment.QueryOffset("my_consumer", streamName) + if err != nil { + return OffsetSpecification{}.First() + } + + return OffsetSpecification{}.Offset(offset + 1) + } + + messagesReceived = 0 + consumer, err = testEnvironment.NewConsumer(streamName, + func(consumerContext ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&messagesReceived, 1) + }, NewConsumerOptions(). + SetOffset(OffsetSpecification{}.Offset(offset)). + SetConsumerName("my_consumer"). + SetSingleActiveConsumer(NewSingleActiveConsumer(consumerUpdate)). + SetAutoCommit(nil)) + Expect(err).NotTo(HaveOccurred()) + + Expect(consumer.Close()).NotTo(HaveOccurred()) + Eventually(func() int32 { + return atomic.LoadInt32(&messagesReceived) + }, 5*time.Second).Should(Equal(int32(0)), + "consumer should have received no messages") + + offset, err = testEnvironment.QueryOffset("my_consumer", streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(offset).To(Equal(int64(9))) + + Expect(producer.Close()).NotTo(HaveOccurred()) + }) + }) diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index 08448677..d0a95c5b 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -282,56 +282,6 @@ var _ = Describe("Streaming Consumers", func() { }) - It("offset should not be overwritten by autocommit on consumer close when no messages have been consumed", func() { - producer, err := env.NewProducer(streamName, nil) - Expect(err).NotTo(HaveOccurred()) - - var messagesReceived int32 = 0 - consumer, err := env.NewConsumer(streamName, - func(consumerContext ConsumerContext, message *amqp.Message) { - atomic.AddInt32(&messagesReceived, 1) - }, NewConsumerOptions(). - SetOffset(OffsetSpecification{}.First()). - SetConsumerName("my_consumer"). - SetAutoCommit(nil)) - Expect(err).NotTo(HaveOccurred()) - - Expect(producer.BatchSend(CreateArrayMessagesForTesting(10))).NotTo(HaveOccurred()) - Eventually(func() int32 { - return atomic.LoadInt32(&messagesReceived) - }, 5*time.Second).Should(Equal(int32(10)), - "consumer should receive only 10 messages") - - Expect(consumer.Close()).NotTo(HaveOccurred()) - Expect(consumer.GetLastStoredOffset()).To(Equal(int64(9))) - - offset, err := env.QueryOffset("my_consumer", streamName) - Expect(err).NotTo(HaveOccurred()) - Expect(offset).To(Equal(int64(9))) - - messagesReceived = 0 - consumer, err = env.NewConsumer(streamName, - func(consumerContext ConsumerContext, message *amqp.Message) { - atomic.AddInt32(&messagesReceived, 1) - }, NewConsumerOptions(). - SetOffset(OffsetSpecification{}.Offset(offset)). - SetConsumerName("my_consumer"). - SetAutoCommit(nil)) - Expect(err).NotTo(HaveOccurred()) - - Expect(consumer.Close()).NotTo(HaveOccurred()) - Eventually(func() int32 { - return atomic.LoadInt32(&messagesReceived) - }, 5*time.Second).Should(Equal(int32(0)), - "consumer should have received no messages") - - offset, err = env.QueryOffset("my_consumer", streamName) - Expect(err).NotTo(HaveOccurred()) - Expect(offset).To(Equal(int64(9))) - - Expect(producer.Close()).NotTo(HaveOccurred()) - }) - It("Deduplication", func() { producerName := "producer-ded" producer, err := env.NewProducer(streamName, NewProducerOptions().SetProducerName(producerName)) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 4b79ea19..39f00998 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -606,9 +606,9 @@ func (c *Client) handleConsumerUpdate(readProtocol *ReaderProtocol, r *bufio.Rea isActive == 1) consumer.options.SingleActiveConsumer.offsetSpecification = responseOff - if isActive == 1 { - consumer.currentOffset = responseOff.offset - } + //if isActive == 1 { + // consumer.currentOffset = responseOff.offset + //} err = consumer.writeConsumeUpdateOffsetToSocket(readProtocol.CorrelationId, responseOff) logErrorCommand(err, "handleConsumerUpdate writeConsumeUpdateOffsetToSocket") From e0b27a9dae6ed2f5d9e46b3db60567eb730b66a4 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 17:40:06 +0200 Subject: [PATCH 05/14] remove offset specification here for SAC --- pkg/stream/consumer_sac_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/stream/consumer_sac_test.go b/pkg/stream/consumer_sac_test.go index b5889d50..16b0fbf3 100644 --- a/pkg/stream/consumer_sac_test.go +++ b/pkg/stream/consumer_sac_test.go @@ -228,7 +228,6 @@ var _ = Describe("Streaming Single Active Consumer", func() { func(consumerContext ConsumerContext, message *amqp.Message) { atomic.AddInt32(&messagesReceived, 1) }, NewConsumerOptions(). - SetOffset(OffsetSpecification{}.Offset(offset)). SetConsumerName("my_consumer"). SetSingleActiveConsumer(NewSingleActiveConsumer(consumerUpdate)). SetAutoCommit(nil)) From 0533a6257e7695f22260737e7494a9002fd16119 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 17:44:20 +0200 Subject: [PATCH 06/14] initializing currentOffset with -1 should resolve the issue --- pkg/stream/coordinator.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 7e3d57c4..9ab68fd7 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -198,11 +198,11 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, defer coordinator.mutex.Unlock() var lastId, _ = coordinator.getNextConsumerItem() var item = &Consumer{ID: lastId, options: parameters, - response: newResponse(lookUpCommand(commandSubscribe)), - status: open, - mutex: &sync.Mutex{}, - MessagesHandler: messagesHandler, - //currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled + response: newResponse(lookUpCommand(commandSubscribe)), + status: open, + mutex: &sync.Mutex{}, + MessagesHandler: messagesHandler, + currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled lastStoredOffset: -1, // because 0 is a valid value for the offset isPromotedAsActive: true, } From f632496f2e3a614ac5c2b0704329177deb83a57c Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 17:49:58 +0200 Subject: [PATCH 07/14] fix current offset for SAC --- pkg/stream/coordinator.go | 1 - pkg/stream/server_frame.go | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 9ab68fd7..97647763 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -202,7 +202,6 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, status: open, mutex: &sync.Mutex{}, MessagesHandler: messagesHandler, - currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled lastStoredOffset: -1, // because 0 is a valid value for the offset isPromotedAsActive: true, } diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 39f00998..4b79ea19 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -606,9 +606,9 @@ func (c *Client) handleConsumerUpdate(readProtocol *ReaderProtocol, r *bufio.Rea isActive == 1) consumer.options.SingleActiveConsumer.offsetSpecification = responseOff - //if isActive == 1 { - // consumer.currentOffset = responseOff.offset - //} + if isActive == 1 { + consumer.currentOffset = responseOff.offset + } err = consumer.writeConsumeUpdateOffsetToSocket(readProtocol.CorrelationId, responseOff) logErrorCommand(err, "handleConsumerUpdate writeConsumeUpdateOffsetToSocket") From 4570157df99fa066af7e495c307f9f83a0ee45ac Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 17:54:00 +0200 Subject: [PATCH 08/14] initialize currentOffset with -1 --- pkg/stream/coordinator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 97647763..9ab68fd7 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -202,6 +202,7 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, status: open, mutex: &sync.Mutex{}, MessagesHandler: messagesHandler, + currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled lastStoredOffset: -1, // because 0 is a valid value for the offset isPromotedAsActive: true, } From fe928c3637a383bdb2525e75d9fc9bd9b39af98e Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 18:02:57 +0200 Subject: [PATCH 09/14] use setCurrentOffset() which handles locking --- pkg/stream/server_frame.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 4b79ea19..b460863f 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -607,7 +607,7 @@ func (c *Client) handleConsumerUpdate(readProtocol *ReaderProtocol, r *bufio.Rea consumer.options.SingleActiveConsumer.offsetSpecification = responseOff if isActive == 1 { - consumer.currentOffset = responseOff.offset + consumer.setCurrentOffset(responseOff.offset) } err = consumer.writeConsumeUpdateOffsetToSocket(readProtocol.CorrelationId, responseOff) From b28b0e846a9016ab481fcc34c4991b2d6d59874a Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 18:10:26 +0200 Subject: [PATCH 10/14] sometimes we all need a little sleep --- pkg/stream/consumer_sac_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/stream/consumer_sac_test.go b/pkg/stream/consumer_sac_test.go index 16b0fbf3..7bd5ea36 100644 --- a/pkg/stream/consumer_sac_test.go +++ b/pkg/stream/consumer_sac_test.go @@ -234,6 +234,7 @@ var _ = Describe("Streaming Single Active Consumer", func() { Expect(err).NotTo(HaveOccurred()) Expect(consumer.Close()).NotTo(HaveOccurred()) + time.Sleep(100 * time.Millisecond) Eventually(func() int32 { return atomic.LoadInt32(&messagesReceived) }, 5*time.Second).Should(Equal(int32(0)), From cb3e06958e5767f05451e141aa0ab5dd210eae99 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 18:16:12 +0200 Subject: [PATCH 11/14] refactor --- pkg/stream/consumer_sac_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/stream/consumer_sac_test.go b/pkg/stream/consumer_sac_test.go index 7bd5ea36..e3b68fb2 100644 --- a/pkg/stream/consumer_sac_test.go +++ b/pkg/stream/consumer_sac_test.go @@ -192,7 +192,7 @@ var _ = Describe("Streaming Single Active Consumer", func() { Expect(err).NotTo(HaveOccurred()) var messagesReceived int32 = 0 - consumer, err := testEnvironment.NewConsumer(streamName, + consumerA, err := testEnvironment.NewConsumer(streamName, func(consumerContext ConsumerContext, message *amqp.Message) { atomic.AddInt32(&messagesReceived, 1) }, NewConsumerOptions(). @@ -207,8 +207,8 @@ var _ = Describe("Streaming Single Active Consumer", func() { }, 5*time.Second).Should(Equal(int32(10)), "consumer should receive only 10 messages") - Expect(consumer.Close()).NotTo(HaveOccurred()) - Expect(consumer.GetLastStoredOffset()).To(Equal(int64(9))) + Expect(consumerA.Close()).NotTo(HaveOccurred()) + Expect(consumerA.GetLastStoredOffset()).To(Equal(int64(9))) offset, err := testEnvironment.QueryOffset("my_consumer", streamName) Expect(err).NotTo(HaveOccurred()) @@ -224,7 +224,7 @@ var _ = Describe("Streaming Single Active Consumer", func() { } messagesReceived = 0 - consumer, err = testEnvironment.NewConsumer(streamName, + consumerB, err := testEnvironment.NewConsumer(streamName, func(consumerContext ConsumerContext, message *amqp.Message) { atomic.AddInt32(&messagesReceived, 1) }, NewConsumerOptions(). @@ -233,16 +233,16 @@ var _ = Describe("Streaming Single Active Consumer", func() { SetAutoCommit(nil)) Expect(err).NotTo(HaveOccurred()) - Expect(consumer.Close()).NotTo(HaveOccurred()) + Expect(consumerB.Close()).NotTo(HaveOccurred()) time.Sleep(100 * time.Millisecond) Eventually(func() int32 { return atomic.LoadInt32(&messagesReceived) }, 5*time.Second).Should(Equal(int32(0)), "consumer should have received no messages") - offset, err = testEnvironment.QueryOffset("my_consumer", streamName) + offsetAfter, err := testEnvironment.QueryOffset("my_consumer", streamName) Expect(err).NotTo(HaveOccurred()) - Expect(offset).To(Equal(int64(9))) + Expect(offsetAfter).To(Equal(int64(9))) Expect(producer.Close()).NotTo(HaveOccurred()) }) From 7087a116742033e128b830bef51f845bea8a37f2 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 18:19:28 +0200 Subject: [PATCH 12/14] make both consumers SAC in test --- pkg/stream/consumer_sac_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/stream/consumer_sac_test.go b/pkg/stream/consumer_sac_test.go index e3b68fb2..23422e2c 100644 --- a/pkg/stream/consumer_sac_test.go +++ b/pkg/stream/consumer_sac_test.go @@ -191,12 +191,21 @@ var _ = Describe("Streaming Single Active Consumer", func() { producer, err := testEnvironment.NewProducer(streamName, nil) Expect(err).NotTo(HaveOccurred()) + consumerUpdate := func(streamName string, isActive bool) OffsetSpecification { + offset, err := testEnvironment.QueryOffset("my_consumer", streamName) + if err != nil { + return OffsetSpecification{}.First() + } + + return OffsetSpecification{}.Offset(offset + 1) + } + var messagesReceived int32 = 0 consumerA, err := testEnvironment.NewConsumer(streamName, func(consumerContext ConsumerContext, message *amqp.Message) { atomic.AddInt32(&messagesReceived, 1) }, NewConsumerOptions(). - SetOffset(OffsetSpecification{}.First()). + SetSingleActiveConsumer(NewSingleActiveConsumer(consumerUpdate)). SetConsumerName("my_consumer"). SetAutoCommit(nil)) Expect(err).NotTo(HaveOccurred()) @@ -214,15 +223,6 @@ var _ = Describe("Streaming Single Active Consumer", func() { Expect(err).NotTo(HaveOccurred()) Expect(offset).To(Equal(int64(9))) - consumerUpdate := func(streamName string, isActive bool) OffsetSpecification { - offset, err := testEnvironment.QueryOffset("my_consumer", streamName) - if err != nil { - return OffsetSpecification{}.First() - } - - return OffsetSpecification{}.Offset(offset + 1) - } - messagesReceived = 0 consumerB, err := testEnvironment.NewConsumer(streamName, func(consumerContext ConsumerContext, message *amqp.Message) { From 805db8cbcfa1aee7c5109f3db1c835a9950a5de5 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Sun, 21 Jul 2024 19:35:24 +0200 Subject: [PATCH 13/14] fix SAC offset being initialized with default next offset SAC offset should be initialized by the ConsumerUpdate func of SAC --- pkg/stream/client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index e8ba9382..08a845aa 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -891,7 +891,9 @@ func (c *Client) DeclareSubscriber(streamName string, // copy the option offset to the consumer offset // the option.offset won't change ( in case we need to retrive the original configuration) // consumer.current offset will be moved when reading - consumer.setCurrentOffset(options.Offset.offset) + if !options.IsSingleActiveConsumerEnabled() { + consumer.setCurrentOffset(options.Offset.offset) + } /// define the consumerOptions consumerProperties := make(map[string]string) From 9fcbfe6a744fe2c30c18e8e927060be90bdbcbdf Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Mon, 22 Jul 2024 11:11:03 +0200 Subject: [PATCH 14/14] remove comment --- pkg/stream/server_frame.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index b460863f..96f0d8e8 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -600,8 +600,6 @@ func (c *Client) handleConsumerUpdate(readProtocol *ReaderProtocol, r *bufio.Rea return } consumer.setPromotedAsActive(isActive == 1) - - // shouldn't this only be run if the consumer is active? responseOff := consumer.options.SingleActiveConsumer.ConsumerUpdate(consumer.GetStreamName(), isActive == 1) consumer.options.SingleActiveConsumer.offsetSpecification = responseOff