diff --git a/pkg/integration_test/stream_integration_test.go b/pkg/integration_test/stream_integration_test.go index aaa37710..0b1717cb 100644 --- a/pkg/integration_test/stream_integration_test.go +++ b/pkg/integration_test/stream_integration_test.go @@ -1,7 +1,9 @@ package integration_test import ( + "errors" "fmt" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "sync" "time" @@ -9,7 +11,6 @@ import ( . "github.com/onsi/gomega" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" stream "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" ) @@ -146,4 +147,97 @@ var _ = Describe("StreamIntegration", func() { Expect(consumer.GetOffset()).To(BeNumerically("==", expectedCurrentOffset)) }) }) + + Context("Initial timestamp offset when no messages exist", func() { + var ( + addresses []string = []string{ + "rabbitmq-stream://guest:guest@localhost:5552/"} + streamName string = "empty-test-stream" + streamEnv *stream.Environment + ) + + // init empty stream + BeforeEach(func() { + var err error + streamEnv, err = stream.NewEnvironment( + stream.NewEnvironmentOptions().SetUris(addresses)) + Expect(err).ToNot(HaveOccurred()) + + err = streamEnv.DeclareStream(streamName, + stream.NewStreamOptions().SetMaxLengthBytes(stream.ByteCapacity{}.GB(2))) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + Expect(streamEnv.DeleteStream(streamName)). + To(SatisfyAny( + Succeed(), + MatchError(stream.StreamDoesNotExist), + )) + }) + + It("correctly handles offsets using timestamps when no messages exist", func() { + var err error + const consumerName = "timestamp-offset-consumer" + + lastMinute := time.Now().Add(-time.Minute).UnixMilli() + + // Implement the UpdateConsumer function to return a timestamp-based offset if no offset exists + // For example, we add a new consumer to the incoming stream and don't want to reread it from the beginning. + updateConsumer := func(streamName string, isActive bool) stream.OffsetSpecification { + offset, err := streamEnv.QueryOffset(consumerName, streamName) + if errors.Is(err, stream.OffsetNotFoundError) { + return stream.OffsetSpecification{}.Timestamp(lastMinute) + } + + Expect(err).ToNot(HaveOccurred()) + + return stream.OffsetSpecification{}.Offset(offset + 1) + } + + options := stream.NewConsumerOptions(). + SetConsumerName(consumerName). + SetAutoCommit(stream.NewAutoCommitStrategy(). + SetFlushInterval(time.Second)). + SetSingleActiveConsumer(stream.NewSingleActiveConsumer(updateConsumer)) + + // Create the consumer + consumer, err := streamEnv.NewConsumer( + streamName, + func(ctx stream.ConsumerContext, msg *amqp.Message) {}, + options, + ) + Expect(err).NotTo(HaveOccurred()) + + // Wait for a flush without messages + // An incorrect offset is stored during this flush + time.Sleep(time.Millisecond * 1200) + Expect(consumer.Close()).ToNot(HaveOccurred()) + + // Re-create the consumer + consumeIsStarted := make(chan struct{}) + handleMessages := func(ctx stream.ConsumerContext, msg *amqp.Message) { + close(consumeIsStarted) + } + + consumer, err = streamEnv.NewConsumer(streamName, handleMessages, options) + Expect(err).NotTo(HaveOccurred()) + + producer, err := streamEnv.NewProducer(streamName, nil) + Expect(err).ToNot(HaveOccurred()) + body := `{"name": "item-1}` + err = producer.Send(amqp.NewMessage([]byte(body))) + Expect(err).ToNot(HaveOccurred()) + + // check if messages are consumed + select { + case <-consumeIsStarted: + case <-time.After(time.Second * 1): + Fail("Timeout waiting for consumer to start") + } + + Expect(consumer.GetOffset()).To(BeNumerically("<=", 0)) + }) + + }) }) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 9b25ae54..a6c0f0ae 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -926,7 +926,7 @@ func (c *Client) declareSubscriber(streamName string, // copy the option offset to the consumer offset // the option.offset won't change ( in case we need to retrive the original configuration) // consumer.current offset will be moved when reading - if !options.IsSingleActiveConsumerEnabled() { + if !options.IsSingleActiveConsumerEnabled() && options.Offset.isOffset() { consumer.setCurrentOffset(options.Offset.offset) } diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index d3a323b3..6153eb9d 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -588,7 +588,7 @@ func (c *Client) handleConsumerUpdate(readProtocol *ReaderProtocol, r *bufio.Rea isActive == 1) consumer.options.SingleActiveConsumer.offsetSpecification = responseOff - if isActive == 1 { + if isActive == 1 && responseOff.isOffset() { consumer.setCurrentOffset(responseOff.offset) }