From f3764f5e25fdcfa3fd304c2849eea3c2fa1ae44b Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Mon, 22 Jul 2024 13:16:18 +0200 Subject: [PATCH 01/11] test that consumer offset is committed after flush interval --- pkg/stream/consumer_test.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index d0a95c5b..b20c081c 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -282,6 +282,35 @@ var _ = Describe("Streaming Consumers", func() { }) + It("commit at flush interval with constant stream of incoming messages", func() { + producer, err := env.NewProducer(streamName, nil) + Expect(err).NotTo(HaveOccurred()) + + var messagesReceived int32 = 0 + consumer, err := env.NewConsumer(streamName, + func(consumerContext ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&messagesReceived, 1) + }, NewConsumerOptions(). + SetAutoCommit(NewAutoCommitStrategy(). + SetCountBeforeStorage(10000000). + SetFlushInterval(time.Second))) + Expect(err).NotTo(HaveOccurred()) + + for i := 0; i < 20; i++ { + Expect(producer.Send(CreateMessageForTesting("", i))).NotTo(HaveOccurred()) + // emit message before the flush interval has elapsed + time.Sleep(time.Millisecond * 100) + + if consumer.GetLastStoredOffset() > 0 { + break + } + } + + Expect(messagesReceived < 15).To(BeTrue()) + Expect(producer.Close()).NotTo(HaveOccurred()) + Expect(consumer.Close()).NotTo(HaveOccurred()) + }) + It("Deduplication", func() { producerName := "producer-ded" producer, err := env.NewProducer(streamName, NewProducerOptions().SetProducerName(producerName)) From 4572555fde879a912b0899113008ed7b244197a8 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Mon, 22 Jul 2024 13:29:52 +0200 Subject: [PATCH 02/11] fix auto commit flush interval not triggered with constant stream of incoming messages --- pkg/stream/client.go | 4 ++-- pkg/stream/consumer.go | 4 ++++ pkg/stream/coordinator.go | 15 ++++++++------- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 08a845aa..38cd255c 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -982,7 +982,8 @@ func (c *Client) DeclareSubscriber(streamName string, } if consumer.options.autocommit { consumer.messageCountBeforeStorage += 1 - if consumer.messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage { + if consumer.messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage || + time.Now().Sub(consumer.lastAutoCommitStored) >= consumer.options.autoCommitStrategy.flushInterval { consumer.cacheStoreOffset() consumer.messageCountBeforeStorage = 0 } @@ -991,7 +992,6 @@ func (c *Client) DeclareSubscriber(streamName string, case <-time.After(consumer.options.autoCommitStrategy.flushInterval): consumer.cacheStoreOffset() - } } }() diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index f310f834..abca4a14 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -36,6 +36,9 @@ type Consumer struct { // is in waiting mode or not. // in normal mode, the consumer is always isPromotedAsActive==true isPromotedAsActive bool + + // lastAutoCommitStored tracks when the offset was last flushed + lastAutoCommitStored time.Time } func (consumer *Consumer) setStatus(status int) { @@ -359,6 +362,7 @@ func (consumer *Consumer) Close() error { func (consumer *Consumer) cacheStoreOffset() { if consumer.options.autocommit { + consumer.lastAutoCommitStored = time.Now() err := consumer.internalStoreOffset() if err != nil { logs.LogError("cache Store Offset error : %s", err) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 9ab68fd7..0d90c64b 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -198,13 +198,14 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, defer coordinator.mutex.Unlock() var lastId, _ = coordinator.getNextConsumerItem() var item = &Consumer{ID: lastId, options: parameters, - response: newResponse(lookUpCommand(commandSubscribe)), - status: open, - mutex: &sync.Mutex{}, - MessagesHandler: messagesHandler, - currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled - lastStoredOffset: -1, // because 0 is a valid value for the offset - isPromotedAsActive: true, + response: newResponse(lookUpCommand(commandSubscribe)), + status: open, + mutex: &sync.Mutex{}, + MessagesHandler: messagesHandler, + currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled + lastStoredOffset: -1, // because 0 is a valid value for the offset + isPromotedAsActive: true, + lastAutoCommitStored: time.Now(), } coordinator.consumers[lastId] = item From fca87f2d77f4a2251390312dac63cda8ca3a2496 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Mon, 22 Jul 2024 13:36:06 +0200 Subject: [PATCH 03/11] use time.Since() --- pkg/stream/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 38cd255c..627b598f 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -983,7 +983,7 @@ func (c *Client) DeclareSubscriber(streamName string, if consumer.options.autocommit { consumer.messageCountBeforeStorage += 1 if consumer.messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage || - time.Now().Sub(consumer.lastAutoCommitStored) >= consumer.options.autoCommitStrategy.flushInterval { + time.Since(consumer.lastAutoCommitStored) >= consumer.options.autoCommitStrategy.flushInterval { consumer.cacheStoreOffset() consumer.messageCountBeforeStorage = 0 } From 8f6c4c9083206f315bb66e4d8d6becf23a5e1208 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Mon, 22 Jul 2024 14:05:36 +0200 Subject: [PATCH 04/11] update ginkgo as test summary is not showing https://github.com/onsi/ginkgo/issues/973 --- go.mod | 17 +++++++++-------- go.sum | 19 +++++++++++++++++++ pkg/stream/consumer_test.go | 2 +- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index a32cea5d..c8c7c5a1 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/uuid v1.6.0 github.com/klauspost/compress v1.17.9 - github.com/onsi/ginkgo/v2 v2.13.0 - github.com/onsi/gomega v1.28.0 + github.com/onsi/ginkgo/v2 v2.19.0 + github.com/onsi/gomega v1.33.1 github.com/pierrec/lz4 v2.6.1+incompatible github.com/pkg/errors v0.9.1 github.com/spaolacci/murmur3 v1.1.0 @@ -16,17 +16,18 @@ require ( require ( github.com/frankban/quicktest v1.14.6 // indirect - github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/logr v1.4.1 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect + github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect + github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - golang.org/x/net v0.23.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.14.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + golang.org/x/tools v0.21.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index b97fceb5..67af0d6f 100644 --- a/go.sum +++ b/go.sum @@ -7,8 +7,12 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -17,6 +21,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 h1:pUa4ghanp6q4IJHwE9RwLgmVFfReJN+KbQ8ExNEUUoQ= github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -32,8 +38,12 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.28.0 h1:i2rg/p9n/UqIDAMFUJ6qIUUMcsqOuUHgbpbu235Vr1c= github.com/onsi/gomega v1.28.0/go.mod h1:A1H2JE76sI14WIP57LMKj7FVfCHx3g3BcZVjJG8bjX8= +github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -55,14 +65,23 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= +golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw= +golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index b20c081c..edec60a1 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -306,7 +306,7 @@ var _ = Describe("Streaming Consumers", func() { } } - Expect(messagesReceived < 15).To(BeTrue()) + Expect(messagesReceived > 5 && messagesReceived < 15).To(BeTrue()) Expect(producer.Close()).NotTo(HaveOccurred()) Expect(consumer.Close()).NotTo(HaveOccurred()) }) From 4fd81d3e339d26852bae51fc0551bf7f1cbb3a9a Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Mon, 22 Jul 2024 14:22:45 +0200 Subject: [PATCH 05/11] add test reason with number of messages received --- pkg/stream/consumer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index edec60a1..090a0e30 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -306,7 +306,7 @@ var _ = Describe("Streaming Consumers", func() { } } - Expect(messagesReceived > 5 && messagesReceived < 15).To(BeTrue()) + Expect(messagesReceived > 5 && messagesReceived < 15).To(BeTrueBecause("%d messages received", messagesReceived)) Expect(producer.Close()).NotTo(HaveOccurred()) Expect(consumer.Close()).NotTo(HaveOccurred()) }) From 5b4d1513ff664ddb9e529e9e6f79d1550822b61d Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Mon, 22 Jul 2024 14:40:26 +0200 Subject: [PATCH 06/11] try up to 50 messages --- pkg/stream/consumer_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index 090a0e30..cdab829b 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -296,7 +296,8 @@ var _ = Describe("Streaming Consumers", func() { SetFlushInterval(time.Second))) Expect(err).NotTo(HaveOccurred()) - for i := 0; i < 20; i++ { + maxMessages := 50 + for i := 0; i < maxMessages; i++ { Expect(producer.Send(CreateMessageForTesting("", i))).NotTo(HaveOccurred()) // emit message before the flush interval has elapsed time.Sleep(time.Millisecond * 100) @@ -306,7 +307,7 @@ var _ = Describe("Streaming Consumers", func() { } } - Expect(messagesReceived > 5 && messagesReceived < 15).To(BeTrueBecause("%d messages received", messagesReceived)) + Expect(messagesReceived > 5 && messagesReceived < int32(maxMessages)).To(BeTrueBecause("%d messages received", messagesReceived)) Expect(producer.Close()).NotTo(HaveOccurred()) Expect(consumer.Close()).NotTo(HaveOccurred()) }) From 6841253fc858f2e6d0e3c94609e9e825052e384f Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Mon, 22 Jul 2024 14:57:09 +0200 Subject: [PATCH 07/11] fix data race --- pkg/stream/client.go | 2 +- pkg/stream/consumer.go | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 627b598f..94de03f7 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -983,7 +983,7 @@ func (c *Client) DeclareSubscriber(streamName string, if consumer.options.autocommit { consumer.messageCountBeforeStorage += 1 if consumer.messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage || - time.Since(consumer.lastAutoCommitStored) >= consumer.options.autoCommitStrategy.flushInterval { + time.Since(consumer.getLastAutoCommitStored()) >= consumer.options.autoCommitStrategy.flushInterval { consumer.cacheStoreOffset() consumer.messageCountBeforeStorage = 0 } diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index abca4a14..3981b5ca 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -362,7 +362,10 @@ func (consumer *Consumer) Close() error { func (consumer *Consumer) cacheStoreOffset() { if consumer.options.autocommit { + consumer.mutex.Lock() consumer.lastAutoCommitStored = time.Now() + consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutex, so not using defer for unlock + err := consumer.internalStoreOffset() if err != nil { logs.LogError("cache Store Offset error : %s", err) @@ -370,6 +373,12 @@ func (consumer *Consumer) cacheStoreOffset() { } } +func (consumer *Consumer) getLastAutoCommitStored() time.Time { + consumer.mutex.Lock() + defer consumer.mutex.Unlock() + return consumer.lastAutoCommitStored +} + func (consumer *Consumer) StoreOffset() error { return consumer.internalStoreOffset() } From 70855d9cc4f6352db2b96823324aba394db96076 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Mon, 22 Jul 2024 15:10:23 +0200 Subject: [PATCH 08/11] tidy --- go.mod | 1 - go.sum | 31 ++----------------------------- 2 files changed, 2 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index c8c7c5a1..723e75f4 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( require ( github.com/frankban/quicktest v1.14.6 // indirect github.com/go-logr/logr v1.4.1 // indirect - github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect diff --git a/go.sum b/go.sum index 67af0d6f..a26a28ac 100644 --- a/go.sum +++ b/go.sum @@ -1,26 +1,17 @@ github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= -github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 h1:pUa4ghanp6q4IJHwE9RwLgmVFfReJN+KbQ8ExNEUUoQ= -github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -36,12 +27,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= -github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= -github.com/onsi/gomega v1.28.0 h1:i2rg/p9n/UqIDAMFUJ6qIUUMcsqOuUHgbpbu235Vr1c= -github.com/onsi/gomega v1.28.0/go.mod h1:A1H2JE76sI14WIP57LMKj7FVfCHx3g3BcZVjJG8bjX8= github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= @@ -50,7 +37,6 @@ github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsK github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= @@ -61,31 +47,18 @@ github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= -golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= -golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= -golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= -golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw= golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From a2936051ce88f294fbb541825893afe75ee10c0a Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Tue, 23 Jul 2024 17:16:16 +0200 Subject: [PATCH 09/11] move resetting messageCountBeforeStorage to cacheStoreOffset() --- pkg/stream/client.go | 1 - pkg/stream/consumer.go | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 94de03f7..4205c60a 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -985,7 +985,6 @@ func (c *Client) DeclareSubscriber(streamName string, if consumer.messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage || time.Since(consumer.getLastAutoCommitStored()) >= consumer.options.autoCommitStrategy.flushInterval { consumer.cacheStoreOffset() - consumer.messageCountBeforeStorage = 0 } } } diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 3981b5ca..03b79a94 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -364,6 +364,7 @@ func (consumer *Consumer) cacheStoreOffset() { if consumer.options.autocommit { consumer.mutex.Lock() consumer.lastAutoCommitStored = time.Now() + consumer.messageCountBeforeStorage = 0 consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutex, so not using defer for unlock err := consumer.internalStoreOffset() From efe00b95ccc31f3694090a671e5262170ed4e08d Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Tue, 23 Jul 2024 17:24:42 +0200 Subject: [PATCH 10/11] fix potential data race with messageCountBeforeStorage when closing consumer --- pkg/stream/client.go | 4 ++-- pkg/stream/consumer.go | 12 ++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 4205c60a..ed3fe378 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -981,8 +981,8 @@ func (c *Client) DeclareSubscriber(streamName string, consumer.MessagesHandler(ConsumerContext{Consumer: consumer, chunkInfo: &chunk}, offMessage.message) } if consumer.options.autocommit { - consumer.messageCountBeforeStorage += 1 - if consumer.messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage || + consumer.increaseMessageCountBeforeStorage() + if consumer.getMessageCountBeforeStorage() >= consumer.options.autoCommitStrategy.messageCountBeforeStorage || time.Since(consumer.getLastAutoCommitStored()) >= consumer.options.autoCommitStrategy.flushInterval { consumer.cacheStoreOffset() } diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 03b79a94..3d973157 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -374,6 +374,18 @@ func (consumer *Consumer) cacheStoreOffset() { } } +func (consumer *Consumer) getMessageCountBeforeStorage() int { + consumer.mutex.Lock() + defer consumer.mutex.Unlock() + return consumer.messageCountBeforeStorage +} + +func (consumer *Consumer) increaseMessageCountBeforeStorage() { + consumer.mutex.Lock() + defer consumer.mutex.Unlock() + consumer.messageCountBeforeStorage += 1 +} + func (consumer *Consumer) getLastAutoCommitStored() time.Time { consumer.mutex.Lock() defer consumer.mutex.Unlock() From f76ca3c0cdbb93e6b0317c8f5a30b2d9061563f0 Mon Sep 17 00:00:00 2001 From: Erik-Jan Rieksen Date: Tue, 23 Jul 2024 17:31:59 +0200 Subject: [PATCH 11/11] have increaseMessageCountBeforeStorage() return current messageCountBeforeStorage so that we don't need getMessageCountBeforeStorage() and request the mutex twice --- pkg/stream/client.go | 4 ++-- pkg/stream/consumer.go | 9 ++------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index ed3fe378..a5adc857 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -981,8 +981,8 @@ func (c *Client) DeclareSubscriber(streamName string, consumer.MessagesHandler(ConsumerContext{Consumer: consumer, chunkInfo: &chunk}, offMessage.message) } if consumer.options.autocommit { - consumer.increaseMessageCountBeforeStorage() - if consumer.getMessageCountBeforeStorage() >= consumer.options.autoCommitStrategy.messageCountBeforeStorage || + messageCountBeforeStorage := consumer.increaseMessageCountBeforeStorage() + if messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage || time.Since(consumer.getLastAutoCommitStored()) >= consumer.options.autoCommitStrategy.flushInterval { consumer.cacheStoreOffset() } diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 3d973157..0fc20c30 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -374,16 +374,11 @@ func (consumer *Consumer) cacheStoreOffset() { } } -func (consumer *Consumer) getMessageCountBeforeStorage() int { - consumer.mutex.Lock() - defer consumer.mutex.Unlock() - return consumer.messageCountBeforeStorage -} - -func (consumer *Consumer) increaseMessageCountBeforeStorage() { +func (consumer *Consumer) increaseMessageCountBeforeStorage() int { consumer.mutex.Lock() defer consumer.mutex.Unlock() consumer.messageCountBeforeStorage += 1 + return consumer.messageCountBeforeStorage } func (consumer *Consumer) getLastAutoCommitStored() time.Time {