Skip to content

Commit fc704e7

Browse files
feat: add support for producing kakfa messages asynchronously
1 parent 7912e94 commit fc704e7

File tree

1 file changed

+20
-0
lines changed

1 file changed

+20
-0
lines changed

exporter/kafkaexporter/exporter.go

+20
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,22 @@ type MessageSender interface {
1919
SendMessages(msgs []*sarama.ProducerMessage) error
2020
}
2121

22+
type AsyncMessageSender struct {
23+
sarama.AsyncProducer
24+
}
25+
26+
func (a *AsyncMessageSender) SendMessages(msgs []*sarama.ProducerMessage) error {
27+
for _, msg := range msgs {
28+
a.AsyncProducer.Input() <- msg
29+
}
30+
return nil
31+
}
32+
2233
// Settings contains Kafka-specific configurations needed for message creation
2334
type Settings struct {
2435
Topic string `json:"topic"`
2536
Addresses []string `json:"addresses"`
37+
Async bool `json:"async"`
2638
*sarama.Config
2739
}
2840

@@ -90,6 +102,14 @@ func (e *Exporter) initializeProducer() error {
90102
if e.dialer == nil {
91103
e.dialer = func(addrs []string, config *sarama.Config) (MessageSender, error) {
92104
// Adapter for the function to comply with the MessageSender interface return
105+
if e.Settings.Async {
106+
asyncProducer, err := sarama.NewAsyncProducer(addrs, config)
107+
if err != nil {
108+
return nil, err
109+
}
110+
e.Settings.Config.Producer.Return.Errors = false //TODO these should be read
111+
return &AsyncMessageSender{AsyncProducer: asyncProducer}, nil
112+
}
93113
return sarama.NewSyncProducer(addrs, config)
94114
}
95115
}

0 commit comments

Comments
 (0)