-
Notifications
You must be signed in to change notification settings - Fork 688
Add log channel to admin client #1448
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,7 @@ | |
t.Skipf("Missing testconf.json") | ||
} | ||
|
||
conf := ConfigMap{"bootstrap.servers": testconf.Brokers} | ||
if err := conf.updateFromTestconf(); err != nil { | ||
t.Fatalf("Failed to update test configuration: %v\n", err) | ||
} | ||
|
@@ -1336,3 +1336,135 @@ | |
|
||
a.Close() | ||
} | ||
|
||
func TestAdminClientLog(t *testing.T) { | ||
logsChan := make(chan LogEvent, 100) | ||
|
||
admin, err := NewAdminClient(&ConfigMap{ | ||
"debug": "all", | ||
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs | ||
"go.logs.channel.enable": true, | ||
"go.logs.channel": logsChan, | ||
}) | ||
if err != nil { | ||
t.Fatalf("Failed to create AdminClient: %v", err) | ||
} | ||
|
||
defer func() { | ||
admin.Close() | ||
close(logsChan) | ||
}() | ||
|
||
// Verify that Logs() method returns the correct channel | ||
if admin.Logs() != logsChan { | ||
t.Fatalf("Expected admin.Logs() %v == logsChan %v", admin.Logs(), logsChan) | ||
} | ||
|
||
expectedLogs := map[struct { | ||
tag string | ||
message string | ||
}]bool{ | ||
{"INIT", "librdkafka"}: false, | ||
} | ||
|
||
go func() { | ||
for { | ||
select { | ||
case log, ok := <-logsChan: | ||
if !ok { | ||
return | ||
} | ||
|
||
t.Log(log.String()) | ||
|
||
for expectedLog, found := range expectedLogs { | ||
if found { | ||
continue | ||
} | ||
if log.Tag != expectedLog.tag { | ||
continue | ||
} | ||
if strings.Contains(log.Message, expectedLog.message) { | ||
expectedLogs[expectedLog] = true | ||
} | ||
} | ||
} | ||
} | ||
}() | ||
|
||
<-time.After(time.Second * 5) | ||
|
||
for expectedLog, found := range expectedLogs { | ||
if !found { | ||
t.Errorf( | ||
"Expected to find log with tag `%s' and message containing `%s',"+ | ||
" but didn't find any.", | ||
expectedLog.tag, | ||
expectedLog.message) | ||
} | ||
} | ||
} | ||
|
||
func TestAdminClientLogWithoutChannel(t *testing.T) { | ||
admin, err := NewAdminClient(&ConfigMap{ | ||
"debug": "all", | ||
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs | ||
"go.logs.channel.enable": true, | ||
// Note: no "go.logs.channel" specified | ||
}) | ||
if err != nil { | ||
t.Fatalf("Failed to create AdminClient: %v", err) | ||
} | ||
|
||
defer admin.Close() | ||
|
||
// Verify that Logs() method returns a channel when enabled but no channel provided | ||
logsChan := admin.Logs() | ||
if logsChan == nil { | ||
t.Fatalf("Expected admin.Logs() to return a channel, got nil") | ||
} | ||
|
||
expectedLogs := map[struct { | ||
tag string | ||
message string | ||
}]bool{ | ||
{"INIT", "librdkafka"}: false, | ||
} | ||
|
||
go func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] Consider adding an explicit termination condition in the goroutine reading from logsChan to avoid potential goroutine leaks when no more log events are sent. For example, signal closure of logsChan when the test completes. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented in acdb44b |
||
for { | ||
select { | ||
case log, ok := <-logsChan: | ||
if !ok { | ||
return | ||
} | ||
|
||
t.Log(log.String()) | ||
|
||
for expectedLog, found := range expectedLogs { | ||
if found { | ||
continue | ||
} | ||
if log.Tag != expectedLog.tag { | ||
continue | ||
} | ||
if strings.Contains(log.Message, expectedLog.message) { | ||
expectedLogs[expectedLog] = true | ||
} | ||
} | ||
} | ||
} | ||
}() | ||
|
||
<-time.After(time.Second * 5) | ||
|
||
for expectedLog, found := range expectedLogs { | ||
if !found { | ||
t.Errorf( | ||
"Expected to find log with tag `%s' and message containing `%s',"+ | ||
" but didn't find any.", | ||
expectedLog.tag, | ||
expectedLog.message) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a Logs() method onto the AdminClient, too, which returns a.handle.logs, to be used in case
"go.logs.channel.enable": true,
but"go.logs.channel": nil,
or unset.After this, if
handle.closeLogsChan
is true, then also close the logs channel here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @milindl for review
Addressed feedback in commit 7245731