Skip to content

Add log channel to admin client #1448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Confluent's Golang client for Apache Kafka

## v2.12.0

This is a feature release:

confluent-kafka-go is based on librdkafka v2.12.0, see the
[librdkafka v2.12.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.

### Enhancements

* Add support for forwarding librdkafka log events to a Go channel in AdminClient (#1448)

## v2.11.0

This is a feature release:
Expand All @@ -8,7 +20,6 @@ confluent-kafka-go is based on librdkafka v2.11.0, see the
[librdkafka v2.11.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


## v2.10.1

This is a maintenance release:
Expand Down
27 changes: 25 additions & 2 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type AdminClient struct {
handle *handle
isDerived bool // Derived from existing client handle
isClosed uint32 // to check if Admin Client is closed or not.
adminTermChan chan bool // For log channel termination
}

// IsClosed returns boolean representing if client is closed or not
Expand Down Expand Up @@ -2737,6 +2738,9 @@ func (a *AdminClient) Close() {
return
}

if a.adminTermChan != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a Logs() method onto the AdminClient, too, which returns a.handle.logs, to be used in case "go.logs.channel.enable": true, but "go.logs.channel": nil, or unset.

After this, if handle.closeLogsChan is true, then also close the logs channel here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @milindl for review
Addressed feedback in commit 7245731

close(a.adminTermChan)
}
a.handle.cleanup()

C.rd_kafka_destroy(a.handle.rk)
Expand Down Expand Up @@ -3724,9 +3728,19 @@ func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {

a := &AdminClient{}
a.handle = &handle{}
a.isClosed = 0

// before we do anything with the configuration, create a copy such that
// the original is not mutated.
confCopy := conf.clone()

logsChanEnable, logsChan, err := confCopy.extractLogConfig()
if err != nil {
return nil, err
}

// Convert ConfigMap to librdkafka conf_t
cConf, err := conf.convert()
cConf, err := confCopy.convert()
if err != nil {
return nil, err
}
Expand All @@ -3746,7 +3760,11 @@ func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {
a.isDerived = false
a.handle.setup()

a.isClosed = 0
// Setup log channel if enabled
if logsChanEnable {
a.adminTermChan = make(chan bool)
a.handle.setupLogQueue(logsChan, a.adminTermChan)
}

return a, nil
}
Expand Down Expand Up @@ -3778,3 +3796,8 @@ func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) {
a.isClosed = 0
return a, nil
}

// Logs returns the log channel if enabled, or nil otherwise.
func (a *AdminClient) Logs() chan LogEvent {
return a.handle.logs
}
132 changes: 132 additions & 0 deletions kafka/adminapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
t.Skipf("Missing testconf.json")
}

conf := ConfigMap{"bootstrap.servers": testconf.Brokers}

Check failure on line 33 in kafka/adminapi_test.go

View check run for this annotation

SonarQube-Confluent / confluent-kafka-go Sonarqube Results

kafka/adminapi_test.go#L33

Define a constant instead of duplicating this literal "bootstrap.servers" 3 times.
if err := conf.updateFromTestconf(); err != nil {
t.Fatalf("Failed to update test configuration: %v\n", err)
}
Expand Down Expand Up @@ -1336,3 +1336,135 @@

a.Close()
}

func TestAdminClientLog(t *testing.T) {

Check failure on line 1340 in kafka/adminapi_test.go

View check run for this annotation

SonarQube-Confluent / confluent-kafka-go Sonarqube Results

kafka/adminapi_test.go#L1340

Refactor this method to reduce its Cognitive Complexity from 25 to the 15 allowed.
logsChan := make(chan LogEvent, 100)

admin, err := NewAdminClient(&ConfigMap{
"debug": "all",
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs
"go.logs.channel.enable": true,
"go.logs.channel": logsChan,
})
if err != nil {
t.Fatalf("Failed to create AdminClient: %v", err)
}

defer func() {
admin.Close()
close(logsChan)
}()

// Verify that Logs() method returns the correct channel
if admin.Logs() != logsChan {
t.Fatalf("Expected admin.Logs() %v == logsChan %v", admin.Logs(), logsChan)
}

expectedLogs := map[struct {
tag string
message string
}]bool{
{"INIT", "librdkafka"}: false,
}

go func() {
for {
select {
case log, ok := <-logsChan:
if !ok {
return
}

t.Log(log.String())

for expectedLog, found := range expectedLogs {
if found {
continue
}
if log.Tag != expectedLog.tag {
continue
}
if strings.Contains(log.Message, expectedLog.message) {
expectedLogs[expectedLog] = true
}
}
}
}
}()

<-time.After(time.Second * 5)

for expectedLog, found := range expectedLogs {
if !found {
t.Errorf(
"Expected to find log with tag `%s' and message containing `%s',"+
" but didn't find any.",
expectedLog.tag,
expectedLog.message)
}
}
}

func TestAdminClientLogWithoutChannel(t *testing.T) {

Check failure on line 1408 in kafka/adminapi_test.go

View check run for this annotation

SonarQube-Confluent / confluent-kafka-go Sonarqube Results

kafka/adminapi_test.go#L1408

Refactor this method to reduce its Cognitive Complexity from 25 to the 15 allowed.
admin, err := NewAdminClient(&ConfigMap{
"debug": "all",
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs
"go.logs.channel.enable": true,
// Note: no "go.logs.channel" specified
})
if err != nil {
t.Fatalf("Failed to create AdminClient: %v", err)
}

defer admin.Close()

// Verify that Logs() method returns a channel when enabled but no channel provided
logsChan := admin.Logs()
if logsChan == nil {
t.Fatalf("Expected admin.Logs() to return a channel, got nil")
}

expectedLogs := map[struct {
tag string
message string
}]bool{
{"INIT", "librdkafka"}: false,
}

go func() {
Copy link
Preview

Copilot AI Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider adding an explicit termination condition in the goroutine reading from logsChan to avoid potential goroutine leaks when no more log events are sent. For example, signal closure of logsChan when the test completes.

Copilot uses AI. Check for mistakes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in acdb44b

for {
select {
case log, ok := <-logsChan:
if !ok {
return
}

t.Log(log.String())

for expectedLog, found := range expectedLogs {
if found {
continue
}
if log.Tag != expectedLog.tag {
continue
}
if strings.Contains(log.Message, expectedLog.message) {
expectedLogs[expectedLog] = true
}
}
}
}
}()

<-time.After(time.Second * 5)

for expectedLog, found := range expectedLogs {
if !found {
t.Errorf(
"Expected to find log with tag `%s' and message containing `%s',"+
" but didn't find any.",
expectedLog.tag,
expectedLog.message)
}
}
}