Skip to content

Commit 6735d55

Browse files
committed
add recover
1 parent 81a4a6c commit 6735d55

File tree

1 file changed

+7
-0
lines changed

1 file changed

+7
-0
lines changed

pkg/kafka/producer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,13 @@ func (p *AsyncProducer) SendData(topic string, multiData ...interface{}) error {
183183

184184
// handleResponse handles the response of async producer, if producer message failed, you can handle it, e.g. add to other queue to handle later.
185185
func (p *AsyncProducer) handleResponse(handleFn AsyncSendFailedHandlerFn) {
186+
defer func() {
187+
if e := recover(); e != nil {
188+
p.zapLogger.Error("panic occurred while processing async message", zap.Any("error", e))
189+
p.handleResponse(handleFn)
190+
}
191+
}()
192+
186193
for {
187194
select {
188195
case pm := <-p.Producer.Successes():

0 commit comments

Comments
 (0)