diff --git a/CHANGELOG.md b/CHANGELOG.md index 483c8aab5..52d2796a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Confluent's Golang client for Apache Kafka +## v2.12.0 + +This is a feature release: + +confluent-kafka-go is based on librdkafka v2.12.0, see the +[librdkafka v2.12.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0) +for a complete list of changes, enhancements, fixes and upgrade considerations. + +### Enhancements + +* Add support for forwarding librdkafka log events to a Go channel in AdminClient (#1448) + ## v2.11.0 This is a feature release: @@ -8,7 +20,6 @@ confluent-kafka-go is based on librdkafka v2.11.0, see the [librdkafka v2.11.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0) for a complete list of changes, enhancements, fixes and upgrade considerations. - ## v2.10.1 This is a maintenance release: diff --git a/kafka/adminapi.go b/kafka/adminapi.go index f1ed663f9..3bcc68c5d 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -160,6 +160,7 @@ type AdminClient struct { handle *handle isDerived bool // Derived from existing client handle isClosed uint32 // to check if Admin Client is closed or not. + adminTermChan chan bool // For log channel termination } // IsClosed returns boolean representing if client is closed or not @@ -2737,6 +2738,9 @@ func (a *AdminClient) Close() { return } + if a.adminTermChan != nil { + close(a.adminTermChan) + } a.handle.cleanup() C.rd_kafka_destroy(a.handle.rk) @@ -3724,9 +3728,19 @@ func NewAdminClient(conf *ConfigMap) (*AdminClient, error) { a := &AdminClient{} a.handle = &handle{} + a.isClosed = 0 + + // before we do anything with the configuration, create a copy such that + // the original is not mutated. + confCopy := conf.clone() + + logsChanEnable, logsChan, err := confCopy.extractLogConfig() + if err != nil { + return nil, err + } // Convert ConfigMap to librdkafka conf_t - cConf, err := conf.convert() + cConf, err := confCopy.convert() if err != nil { return nil, err } @@ -3746,7 +3760,11 @@ func NewAdminClient(conf *ConfigMap) (*AdminClient, error) { a.isDerived = false a.handle.setup() - a.isClosed = 0 + // Setup log channel if enabled + if logsChanEnable { + a.adminTermChan = make(chan bool) + a.handle.setupLogQueue(logsChan, a.adminTermChan) + } return a, nil } @@ -3778,3 +3796,8 @@ func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) { a.isClosed = 0 return a, nil } + +// Logs returns the log channel if enabled, or nil otherwise. +func (a *AdminClient) Logs() chan LogEvent { + return a.handle.logs +} diff --git a/kafka/adminapi_test.go b/kafka/adminapi_test.go index b835de515..f1efd3925 100644 --- a/kafka/adminapi_test.go +++ b/kafka/adminapi_test.go @@ -1336,3 +1336,135 @@ func TestAdminAPIs(t *testing.T) { a.Close() } + +func TestAdminClientLog(t *testing.T) { + logsChan := make(chan LogEvent, 100) + + admin, err := NewAdminClient(&ConfigMap{ + "debug": "all", + "bootstrap.servers": "localhost:65533", // Unreachable to generate logs + "go.logs.channel.enable": true, + "go.logs.channel": logsChan, + }) + if err != nil { + t.Fatalf("Failed to create AdminClient: %v", err) + } + + defer func() { + admin.Close() + close(logsChan) + }() + + // Verify that Logs() method returns the correct channel + if admin.Logs() != logsChan { + t.Fatalf("Expected admin.Logs() %v == logsChan %v", admin.Logs(), logsChan) + } + + expectedLogs := map[struct { + tag string + message string + }]bool{ + {"INIT", "librdkafka"}: false, + } + + go func() { + for { + select { + case log, ok := <-logsChan: + if !ok { + return + } + + t.Log(log.String()) + + for expectedLog, found := range expectedLogs { + if found { + continue + } + if log.Tag != expectedLog.tag { + continue + } + if strings.Contains(log.Message, expectedLog.message) { + expectedLogs[expectedLog] = true + } + } + } + } + }() + + <-time.After(time.Second * 5) + + for expectedLog, found := range expectedLogs { + if !found { + t.Errorf( + "Expected to find log with tag `%s' and message containing `%s',"+ + " but didn't find any.", + expectedLog.tag, + expectedLog.message) + } + } +} + +func TestAdminClientLogWithoutChannel(t *testing.T) { + admin, err := NewAdminClient(&ConfigMap{ + "debug": "all", + "bootstrap.servers": "localhost:65533", // Unreachable to generate logs + "go.logs.channel.enable": true, + // Note: no "go.logs.channel" specified + }) + if err != nil { + t.Fatalf("Failed to create AdminClient: %v", err) + } + + defer admin.Close() + + // Verify that Logs() method returns a channel when enabled but no channel provided + logsChan := admin.Logs() + if logsChan == nil { + t.Fatalf("Expected admin.Logs() to return a channel, got nil") + } + + expectedLogs := map[struct { + tag string + message string + }]bool{ + {"INIT", "librdkafka"}: false, + } + + go func() { + for { + select { + case log, ok := <-logsChan: + if !ok { + return + } + + t.Log(log.String()) + + for expectedLog, found := range expectedLogs { + if found { + continue + } + if log.Tag != expectedLog.tag { + continue + } + if strings.Contains(log.Message, expectedLog.message) { + expectedLogs[expectedLog] = true + } + } + } + } + }() + + <-time.After(time.Second * 5) + + for expectedLog, found := range expectedLogs { + if !found { + t.Errorf( + "Expected to find log with tag `%s' and message containing `%s',"+ + " but didn't find any.", + expectedLog.tag, + expectedLog.message) + } + } +} \ No newline at end of file