@@ -288,6 +288,7 @@ public void checkAndEnqueueWhenSendFailed(RecordMetadata recordMetadata, Excepti
288288 Integer count = eventTriedCount .get (event .getLoggingAuditHeaders ());
289289 if (count == null ){
290290 eventTriedCount .put (event .getLoggingAuditHeaders (), 1 );
291+ insertEvent (event );
291292 OpenTsdbMetricConverter
292293 .gauge (LoggingAuditClientMetrics .AUDIT_CLIENT_SENDER_KAFKA_EVENTS_RETRIED ,
293294 eventTriedCount .size (), "host=" + host , "stage=" + stage .toString (),
@@ -301,19 +302,23 @@ public void checkAndEnqueueWhenSendFailed(RecordMetadata recordMetadata, Excepti
301302 eventTriedCount .remove (event .getLoggingAuditHeaders ());
302303 } else {
303304 eventTriedCount .put (event .getLoggingAuditHeaders (), count + 1 );
304- try {
305- boolean success = queue .offerFirst (event , 3 , TimeUnit .SECONDS );
306- if (!success ) {
307- LOG .debug ("Failed to enqueue LoggingAuditEvent at head of the queue when executing "
308- + "producer send callback. Drop this event." );
309- eventTriedCount .remove (event .getLoggingAuditHeaders ());
310- }
311- } catch (InterruptedException ex ) {
312- LOG .debug (
313- "Enqueuing LoggingAuditEvent at head of the queue was interrupted in callback. "
314- + "Drop this event" );
315- eventTriedCount .remove (event .getLoggingAuditHeaders ());
316- }
305+ insertEvent (event );
306+ }
307+ }
308+
309+ public void insertEvent (LoggingAuditEvent event ){
310+ try {
311+ boolean success = queue .offerFirst (event , 3 , TimeUnit .SECONDS );
312+ if (!success ) {
313+ LOG .debug ("Failed to enqueue LoggingAuditEvent at head of the queue when executing "
314+ + "producer send callback. Drop this event." );
315+ eventTriedCount .remove (event .getLoggingAuditHeaders ());
316+ }
317+ } catch (InterruptedException ex ) {
318+ LOG .debug (
319+ "Enqueuing LoggingAuditEvent at head of the queue was interrupted in callback. "
320+ + "Drop this event" );
321+ eventTriedCount .remove (event .getLoggingAuditHeaders ());
317322 }
318323 }
319324
0 commit comments