Skip to content

Commit 8510c80

Browse files
feat: add support for producing kakfa messages asynchronously
1 parent 7912e94 commit 8510c80

File tree

1 file changed

+32
-1
lines changed

1 file changed

+32
-1
lines changed

exporter/kafkaexporter/exporter.go

+32-1
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,28 @@ type MessageSender interface {
1919
SendMessages(msgs []*sarama.ProducerMessage) error
2020
}
2121

22+
type AsyncMessageSender struct {
23+
sarama.AsyncProducer
24+
logger *fflog.FFLogger
25+
}
26+
27+
func (a *AsyncMessageSender) SendMessages(msgs []*sarama.ProducerMessage) error {
28+
for len(msgs) > 0 {
29+
select {
30+
case err := <-a.AsyncProducer.Errors():
31+
a.logger.Warn("Failed to produce message: %w", err)
32+
case a.AsyncProducer.Input() <- msgs[0]:
33+
msgs = msgs[1:]
34+
}
35+
}
36+
return nil
37+
}
38+
2239
// Settings contains Kafka-specific configurations needed for message creation
2340
type Settings struct {
2441
Topic string `json:"topic"`
2542
Addresses []string `json:"addresses"`
43+
Async bool `json:"async"`
2644
*sarama.Config
2745
}
2846

@@ -41,18 +59,24 @@ type Exporter struct {
4159
// dialer will create the producer. This field is added for dependency injection during testing as sarama
4260
// has the annoying tendency to dial as soon as a producer is created.
4361
dialer func(addrs []string, config *sarama.Config) (MessageSender, error)
62+
63+
logger *fflog.FFLogger
4464
}
4565

4666
// Export will produce a message to the Kafka topic. The message's value will contain the event encoded in the
4767
// selected format. Messages are published synchronously and will error immediately on failure.
48-
func (e *Exporter) Export(_ context.Context, _ *fflog.FFLogger, featureEvents []exporter.FeatureEvent) error {
68+
func (e *Exporter) Export(_ context.Context, logger *fflog.FFLogger, featureEvents []exporter.FeatureEvent) error {
4969
if e.sender == nil {
5070
err := e.initializeProducer()
5171
if err != nil {
5272
return fmt.Errorf("writer: %w", err)
5373
}
5474
}
5575

76+
if e.logger == nil {
77+
e.logger = logger
78+
}
79+
5680
messages := make([]*sarama.ProducerMessage, 0, len(featureEvents))
5781
for _, event := range featureEvents {
5882
data, err := e.formatMessage(event)
@@ -90,6 +114,13 @@ func (e *Exporter) initializeProducer() error {
90114
if e.dialer == nil {
91115
e.dialer = func(addrs []string, config *sarama.Config) (MessageSender, error) {
92116
// Adapter for the function to comply with the MessageSender interface return
117+
if e.Settings.Async {
118+
asyncProducer, err := sarama.NewAsyncProducer(addrs, config)
119+
if err != nil {
120+
return nil, err
121+
}
122+
return &AsyncMessageSender{AsyncProducer: asyncProducer, logger: e.logger}, nil //TODO Close should be called on shutdown
123+
}
93124
return sarama.NewSyncProducer(addrs, config)
94125
}
95126
}

0 commit comments

Comments
 (0)