Skip to content

Commit f9051be

Browse files
committed
Address feedback
1 parent 672a68f commit f9051be

File tree

3 files changed

+86
-1
lines changed

3 files changed

+86
-1
lines changed

CHANGELOG.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
# Confluent's Golang client for Apache Kafka
22

3+
## v2.12.0
4+
5+
This is a feature release:
6+
7+
confluent-kafka-go is based on librdkafka v2.12.0, see the
8+
[librdkafka v2.12.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0)
9+
for a complete list of changes, enhancements, fixes and upgrade considerations.
10+
11+
### Enhancements
12+
13+
* Add support for forwarding librdkafka log events to a Go channel in AdminClient (#1448)
14+
315
## v2.11.0
416

517
This is a feature release:
@@ -8,7 +20,6 @@ confluent-kafka-go is based on librdkafka v2.11.0, see the
820
[librdkafka v2.11.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0)
921
for a complete list of changes, enhancements, fixes and upgrade considerations.
1022

11-
1223
## v2.10.1
1324

1425
This is a maintenance release:

kafka/adminapi.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3796,3 +3796,8 @@ func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) {
37963796
a.isClosed = 0
37973797
return a, nil
37983798
}
3799+
3800+
// Logs returns the log channel if enabled, or nil otherwise.
3801+
func (a *AdminClient) Logs() chan LogEvent {
3802+
return a.handle.logs
3803+
}

kafka/adminapi_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1355,6 +1355,75 @@ func TestAdminClientLog(t *testing.T) {
13551355
close(logsChan)
13561356
}()
13571357

1358+
// Verify that Logs() method returns the correct channel
1359+
if admin.Logs() != logsChan {
1360+
t.Fatalf("Expected admin.Logs() %v == logsChan %v", admin.Logs(), logsChan)
1361+
}
1362+
1363+
expectedLogs := map[struct {
1364+
tag string
1365+
message string
1366+
}]bool{
1367+
{"INIT", "librdkafka"}: false,
1368+
}
1369+
1370+
go func() {
1371+
for {
1372+
select {
1373+
case log, ok := <-logsChan:
1374+
if !ok {
1375+
return
1376+
}
1377+
1378+
t.Log(log.String())
1379+
1380+
for expectedLog, found := range expectedLogs {
1381+
if found {
1382+
continue
1383+
}
1384+
if log.Tag != expectedLog.tag {
1385+
continue
1386+
}
1387+
if strings.Contains(log.Message, expectedLog.message) {
1388+
expectedLogs[expectedLog] = true
1389+
}
1390+
}
1391+
}
1392+
}
1393+
}()
1394+
1395+
<-time.After(time.Second * 5)
1396+
1397+
for expectedLog, found := range expectedLogs {
1398+
if !found {
1399+
t.Errorf(
1400+
"Expected to find log with tag `%s' and message containing `%s',"+
1401+
" but didn't find any.",
1402+
expectedLog.tag,
1403+
expectedLog.message)
1404+
}
1405+
}
1406+
}
1407+
1408+
func TestAdminClientLogWithoutChannel(t *testing.T) {
1409+
admin, err := NewAdminClient(&ConfigMap{
1410+
"debug": "all",
1411+
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs
1412+
"go.logs.channel.enable": true,
1413+
// Note: no "go.logs.channel" specified
1414+
})
1415+
if err != nil {
1416+
t.Fatalf("Failed to create AdminClient: %v", err)
1417+
}
1418+
1419+
defer admin.Close()
1420+
1421+
// Verify that Logs() method returns a channel when enabled but no channel provided
1422+
logsChan := admin.Logs()
1423+
if logsChan == nil {
1424+
t.Fatalf("Expected admin.Logs() to return a channel, got nil")
1425+
}
1426+
13581427
expectedLogs := map[struct {
13591428
tag string
13601429
message string

0 commit comments

Comments
 (0)