Skip to content

Commit b737754

Browse files
authored
add a new metric named batchQueueTime to track the batch time cost (#1103)
1 parent 185381d commit b737754

File tree

1 file changed

+30
-25
lines changed

1 file changed

+30
-25
lines changed

writer.go

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,29 @@ import (
2727
// by the function and test if it an instance of kafka.WriteErrors in order to
2828
// identify which messages have succeeded or failed, for example:
2929
//
30-
// // Construct a synchronous writer (the default mode).
31-
// w := &kafka.Writer{
32-
// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
33-
// Topic: "topic-A",
34-
// RequiredAcks: kafka.RequireAll,
35-
// }
30+
// // Construct a synchronous writer (the default mode).
31+
// w := &kafka.Writer{
32+
// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
33+
// Topic: "topic-A",
34+
// RequiredAcks: kafka.RequireAll,
35+
// }
3636
//
37-
// ...
37+
// ...
3838
//
39-
// // Passing a context can prevent the operation from blocking indefinitely.
40-
// switch err := w.WriteMessages(ctx, msgs...).(type) {
41-
// case nil:
42-
// case kafka.WriteErrors:
43-
// for i := range msgs {
44-
// if err[i] != nil {
45-
// // handle the error writing msgs[i]
46-
// ...
39+
// // Passing a context can prevent the operation from blocking indefinitely.
40+
// switch err := w.WriteMessages(ctx, msgs...).(type) {
41+
// case nil:
42+
// case kafka.WriteErrors:
43+
// for i := range msgs {
44+
// if err[i] != nil {
45+
// // handle the error writing msgs[i]
46+
// ...
47+
// }
4748
// }
49+
// default:
50+
// // handle other errors
51+
// ...
4852
// }
49-
// default:
50-
// // handle other errors
51-
// ...
52-
// }
5353
//
5454
// In asynchronous mode, the program may configure a completion handler on the
5555
// writer to receive notifications of messages being written to kafka:
@@ -348,12 +348,13 @@ type WriterStats struct {
348348
Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"`
349349
Errors int64 `metric:"kafka.writer.error.count" type:"counter"`
350350

351-
BatchTime DurationStats `metric:"kafka.writer.batch.seconds"`
352-
WriteTime DurationStats `metric:"kafka.writer.write.seconds"`
353-
WaitTime DurationStats `metric:"kafka.writer.wait.seconds"`
354-
Retries int64 `metric:"kafka.writer.retries.count" type:"counter"`
355-
BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
356-
BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`
351+
BatchTime DurationStats `metric:"kafka.writer.batch.seconds"`
352+
BatchQueueTime DurationStats `metric:"kafka.writer.batch.queue.seconds"`
353+
WriteTime DurationStats `metric:"kafka.writer.write.seconds"`
354+
WaitTime DurationStats `metric:"kafka.writer.wait.seconds"`
355+
Retries int64 `metric:"kafka.writer.retries.count" type:"counter"`
356+
BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
357+
BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`
357358

358359
MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
359360
WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"`
@@ -398,6 +399,7 @@ type writerStats struct {
398399
errors counter
399400
dialTime summary
400401
batchTime summary
402+
batchQueueTime summary
401403
writeTime summary
402404
waitTime summary
403405
retries counter
@@ -880,6 +882,7 @@ func (w *Writer) Stats() WriterStats {
880882
Errors: stats.errors.snapshot(),
881883
DialTime: stats.dialTime.snapshotDuration(),
882884
BatchTime: stats.batchTime.snapshotDuration(),
885+
BatchQueueTime: stats.batchQueueTime.snapshotDuration(),
883886
WriteTime: stats.writeTime.snapshotDuration(),
884887
WaitTime: stats.waitTime.snapshotDuration(),
885888
Retries: stats.retries.snapshot(),
@@ -1088,6 +1091,8 @@ func (ptw *partitionWriter) awaitBatch(batch *writeBatch) {
10881091
// having it leak until it expires.
10891092
batch.timer.Stop()
10901093
}
1094+
stats := ptw.w.stats()
1095+
stats.batchQueueTime.observe(int64(time.Since(batch.time)))
10911096
}
10921097

10931098
func (ptw *partitionWriter) writeBatch(batch *writeBatch) {

0 commit comments

Comments
 (0)