Skip to content

Commit 34bcc2d

Browse files
Fix flush interval (#338)
* test that consumer offset is committed after flush interval * fix auto commit flush interval not triggered with constant stream of incoming messages * use time.Since() * update ginkgo as test summary is not showing onsi/ginkgo#973 * add test reason with number of messages received * try up to 50 messages * fix data race * tidy * move resetting messageCountBeforeStorage to cacheStoreOffset() * fix potential data race with messageCountBeforeStorage when closing consumer * have increaseMessageCountBeforeStorage() return current messageCountBeforeStorage so that we don't need getMessageCountBeforeStorage() and request the mutex twice --------- Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent cb4e0d2 commit 34bcc2d

File tree

6 files changed

+91
-48
lines changed

6 files changed

+91
-48
lines changed

go.mod

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ require (
66
github.com/golang/snappy v0.0.4
77
github.com/google/uuid v1.6.0
88
github.com/klauspost/compress v1.17.9
9-
github.com/onsi/ginkgo/v2 v2.13.0
10-
github.com/onsi/gomega v1.28.0
9+
github.com/onsi/ginkgo/v2 v2.19.0
10+
github.com/onsi/gomega v1.33.1
1111
github.com/pierrec/lz4 v2.6.1+incompatible
1212
github.com/pkg/errors v0.9.1
1313
github.com/spaolacci/murmur3 v1.1.0
@@ -16,17 +16,17 @@ require (
1616

1717
require (
1818
github.com/frankban/quicktest v1.14.6 // indirect
19-
github.com/go-logr/logr v1.2.4 // indirect
20-
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
19+
github.com/go-logr/logr v1.4.1 // indirect
20+
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
2121
github.com/google/go-cmp v0.6.0 // indirect
22-
github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect
22+
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect
2323
github.com/inconshreveable/mousetrap v1.1.0 // indirect
2424
github.com/rogpeppe/go-internal v1.11.0 // indirect
2525
github.com/spf13/pflag v1.0.5 // indirect
26-
golang.org/x/net v0.23.0 // indirect
27-
golang.org/x/sys v0.18.0 // indirect
28-
golang.org/x/text v0.14.0 // indirect
29-
golang.org/x/tools v0.14.0 // indirect
26+
golang.org/x/net v0.25.0 // indirect
27+
golang.org/x/sys v0.20.0 // indirect
28+
golang.org/x/text v0.15.0 // indirect
29+
golang.org/x/tools v0.21.0 // indirect
3030
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
3131
gopkg.in/yaml.v3 v3.0.1 // indirect
3232
)

go.sum

Lines changed: 20 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,19 @@
11
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
22
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
3-
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
43
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
5-
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
64
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
75
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
8-
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
9-
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
10-
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
11-
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
12-
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
6+
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
7+
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
8+
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
9+
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
1310
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
1411
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
1512
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
1613
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
1714
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
18-
github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 h1:pUa4ghanp6q4IJHwE9RwLgmVFfReJN+KbQ8ExNEUUoQ=
19-
github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
15+
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=
16+
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
2017
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
2118
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
2219
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
@@ -30,17 +27,16 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
3027
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
3128
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
3229
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
33-
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
34-
github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o=
35-
github.com/onsi/gomega v1.28.0 h1:i2rg/p9n/UqIDAMFUJ6qIUUMcsqOuUHgbpbu235Vr1c=
36-
github.com/onsi/gomega v1.28.0/go.mod h1:A1H2JE76sI14WIP57LMKj7FVfCHx3g3BcZVjJG8bjX8=
30+
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
31+
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
32+
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
33+
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
3734
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
3835
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
3936
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
4037
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
4138
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
4239
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
43-
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
4440
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
4541
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
4642
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
@@ -51,22 +47,18 @@ github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
5147
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
5248
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
5349
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
54-
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
55-
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
56-
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
57-
golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY=
58-
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
59-
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
60-
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
61-
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
62-
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
63-
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
64-
golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc=
65-
golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg=
66-
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
50+
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
51+
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
52+
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
53+
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
54+
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
55+
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
56+
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
57+
golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw=
58+
golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
59+
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
6760
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
6861
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
6962
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
70-
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
7163
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
7264
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

pkg/stream/client.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -981,17 +981,16 @@ func (c *Client) DeclareSubscriber(streamName string,
981981
consumer.MessagesHandler(ConsumerContext{Consumer: consumer, chunkInfo: &chunk}, offMessage.message)
982982
}
983983
if consumer.options.autocommit {
984-
consumer.messageCountBeforeStorage += 1
985-
if consumer.messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage {
984+
messageCountBeforeStorage := consumer.increaseMessageCountBeforeStorage()
985+
if messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage ||
986+
time.Since(consumer.getLastAutoCommitStored()) >= consumer.options.autoCommitStrategy.flushInterval {
986987
consumer.cacheStoreOffset()
987-
consumer.messageCountBeforeStorage = 0
988988
}
989989
}
990990
}
991991

992992
case <-time.After(consumer.options.autoCommitStrategy.flushInterval):
993993
consumer.cacheStoreOffset()
994-
995994
}
996995
}
997996
}()

pkg/stream/consumer.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ type Consumer struct {
3636
// is in waiting mode or not.
3737
// in normal mode, the consumer is always isPromotedAsActive==true
3838
isPromotedAsActive bool
39+
40+
// lastAutoCommitStored tracks when the offset was last flushed
41+
lastAutoCommitStored time.Time
3942
}
4043

4144
func (consumer *Consumer) setStatus(status int) {
@@ -359,13 +362,31 @@ func (consumer *Consumer) Close() error {
359362

360363
func (consumer *Consumer) cacheStoreOffset() {
361364
if consumer.options.autocommit {
365+
consumer.mutex.Lock()
366+
consumer.lastAutoCommitStored = time.Now()
367+
consumer.messageCountBeforeStorage = 0
368+
consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutex, so not using defer for unlock
369+
362370
err := consumer.internalStoreOffset()
363371
if err != nil {
364372
logs.LogError("cache Store Offset error : %s", err)
365373
}
366374
}
367375
}
368376

377+
func (consumer *Consumer) increaseMessageCountBeforeStorage() int {
378+
consumer.mutex.Lock()
379+
defer consumer.mutex.Unlock()
380+
consumer.messageCountBeforeStorage += 1
381+
return consumer.messageCountBeforeStorage
382+
}
383+
384+
func (consumer *Consumer) getLastAutoCommitStored() time.Time {
385+
consumer.mutex.Lock()
386+
defer consumer.mutex.Unlock()
387+
return consumer.lastAutoCommitStored
388+
}
389+
369390
func (consumer *Consumer) StoreOffset() error {
370391
return consumer.internalStoreOffset()
371392
}

pkg/stream/consumer_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,36 @@ var _ = Describe("Streaming Consumers", func() {
282282

283283
})
284284

285+
It("commit at flush interval with constant stream of incoming messages", func() {
286+
producer, err := env.NewProducer(streamName, nil)
287+
Expect(err).NotTo(HaveOccurred())
288+
289+
var messagesReceived int32 = 0
290+
consumer, err := env.NewConsumer(streamName,
291+
func(consumerContext ConsumerContext, message *amqp.Message) {
292+
atomic.AddInt32(&messagesReceived, 1)
293+
}, NewConsumerOptions().
294+
SetAutoCommit(NewAutoCommitStrategy().
295+
SetCountBeforeStorage(10000000).
296+
SetFlushInterval(time.Second)))
297+
Expect(err).NotTo(HaveOccurred())
298+
299+
maxMessages := 50
300+
for i := 0; i < maxMessages; i++ {
301+
Expect(producer.Send(CreateMessageForTesting("", i))).NotTo(HaveOccurred())
302+
// emit message before the flush interval has elapsed
303+
time.Sleep(time.Millisecond * 100)
304+
305+
if consumer.GetLastStoredOffset() > 0 {
306+
break
307+
}
308+
}
309+
310+
Expect(messagesReceived > 5 && messagesReceived < int32(maxMessages)).To(BeTrueBecause("%d messages received", messagesReceived))
311+
Expect(producer.Close()).NotTo(HaveOccurred())
312+
Expect(consumer.Close()).NotTo(HaveOccurred())
313+
})
314+
285315
It("Deduplication", func() {
286316
producerName := "producer-ded"
287317
producer, err := env.NewProducer(streamName, NewProducerOptions().SetProducerName(producerName))

pkg/stream/coordinator.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,14 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
198198
defer coordinator.mutex.Unlock()
199199
var lastId, _ = coordinator.getNextConsumerItem()
200200
var item = &Consumer{ID: lastId, options: parameters,
201-
response: newResponse(lookUpCommand(commandSubscribe)),
202-
status: open,
203-
mutex: &sync.Mutex{},
204-
MessagesHandler: messagesHandler,
205-
currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled
206-
lastStoredOffset: -1, // because 0 is a valid value for the offset
207-
isPromotedAsActive: true,
201+
response: newResponse(lookUpCommand(commandSubscribe)),
202+
status: open,
203+
mutex: &sync.Mutex{},
204+
MessagesHandler: messagesHandler,
205+
currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled
206+
lastStoredOffset: -1, // because 0 is a valid value for the offset
207+
isPromotedAsActive: true,
208+
lastAutoCommitStored: time.Now(),
208209
}
209210

210211
coordinator.consumers[lastId] = item

0 commit comments

Comments
 (0)