Skip to content

Commit ad2ae04

Browse files
committed
Add manual commit
Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
1 parent 85d1327 commit ad2ae04

File tree

2 files changed

+12
-1
lines changed

2 files changed

+12
-1
lines changed

examples/offsetTracking/offsetTracking.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func main() {
7171
streamName,
7272
handleMessages,
7373
stream.NewConsumerOptions().
74+
SetManualCommit(). // disable auto commit
7475
SetConsumerName("my_consumer"). // set a consumer name
7576
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
7677
CheckErr(err)

pkg/stream/super_stream_consumer.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type SuperStreamConsumerOptions struct {
1515
SingleActiveConsumer *SingleActiveConsumer
1616
ConsumerName string
1717
AutoCommitStrategy *AutoCommitStrategy
18+
Autocommit bool
1819
}
1920

2021
func NewSuperStreamConsumerOptions() *SuperStreamConsumerOptions {
@@ -49,10 +50,16 @@ func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *Super
4950
}
5051

5152
func (s *SuperStreamConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *SuperStreamConsumerOptions {
53+
s.Autocommit = true
5254
s.AutoCommitStrategy = autoCommitStrategy
5355
return s
5456
}
5557

58+
func (s *SuperStreamConsumerOptions) SetManualCommit() *SuperStreamConsumerOptions {
59+
s.Autocommit = false
60+
return s
61+
}
62+
5663
// CPartitionClose is a struct that is used to notify the user when a partition from a consumer is closed
5764
// The user can use the NotifyPartitionClose to get the channel
5865
type CPartitionClose struct {
@@ -167,8 +174,11 @@ func (s *SuperStreamConsumer) ConnectPartition(partition string, offset OffsetSp
167174
}
168175

169176
options = options.SetFilter(s.SuperStreamConsumerOptions.Filter)
170-
if s.SuperStreamConsumerOptions.AutoCommitStrategy != nil {
177+
178+
if s.SuperStreamConsumerOptions.Autocommit {
171179
options = options.SetAutoCommit(s.SuperStreamConsumerOptions.AutoCommitStrategy)
180+
} else {
181+
options = options.SetManualCommit()
172182
}
173183

174184
if s.SuperStreamConsumerOptions.SingleActiveConsumer != nil {

0 commit comments

Comments
 (0)