Skip to content

Commit 912b0d0

Browse files
committed
Issue-1119 : Add log channel to admin client
1 parent 734ea64 commit 912b0d0

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed

kafka/adminapi.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ type AdminClient struct {
160160
handle *handle
161161
isDerived bool // Derived from existing client handle
162162
isClosed uint32 // to check if Admin Client is closed or not.
163+
adminTermChan chan bool // For log channel termination
163164
}
164165

165166
// IsClosed returns boolean representing if client is closed or not
@@ -2737,6 +2738,9 @@ func (a *AdminClient) Close() {
27372738
return
27382739
}
27392740

2741+
if a.adminTermChan != nil {
2742+
close(a.adminTermChan)
2743+
}
27402744
a.handle.cleanup()
27412745

27422746
C.rd_kafka_destroy(a.handle.rk)
@@ -3724,9 +3728,19 @@ func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {
37243728

37253729
a := &AdminClient{}
37263730
a.handle = &handle{}
3731+
a.isClosed = 0
3732+
3733+
// before we do anything with the configuration, create a copy such that
3734+
// the original is not mutated.
3735+
confCopy := conf.clone()
3736+
3737+
logsChanEnable, logsChan, err := confCopy.extractLogConfig()
3738+
if err != nil {
3739+
return nil, err
3740+
}
37273741

37283742
// Convert ConfigMap to librdkafka conf_t
3729-
cConf, err := conf.convert()
3743+
cConf, err := confCopy.convert()
37303744
if err != nil {
37313745
return nil, err
37323746
}
@@ -3746,7 +3760,11 @@ func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {
37463760
a.isDerived = false
37473761
a.handle.setup()
37483762

3749-
a.isClosed = 0
3763+
// Setup log channel if enabled
3764+
if logsChanEnable {
3765+
a.adminTermChan = make(chan bool)
3766+
a.handle.setupLogQueue(logsChan, a.adminTermChan)
3767+
}
37503768

37513769
return a, nil
37523770
}

kafka/adminapi_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,3 +1336,63 @@ func TestAdminAPIs(t *testing.T) {
13361336

13371337
a.Close()
13381338
}
1339+
1340+
func TestAdminClientLog(t *testing.T) {
1341+
logsChan := make(chan LogEvent, 100)
1342+
1343+
admin, err := NewAdminClient(&ConfigMap{
1344+
"debug": "all",
1345+
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs
1346+
"go.logs.channel.enable": true,
1347+
"go.logs.channel": logsChan,
1348+
})
1349+
if err != nil {
1350+
t.Fatalf("Failed to create AdminClient: %v", err)
1351+
}
1352+
1353+
defer admin.Close()
1354+
1355+
expectedLogs := map[struct {
1356+
tag string
1357+
message string
1358+
}]bool{
1359+
{"INIT", "librdkafka"}: false,
1360+
}
1361+
1362+
go func() {
1363+
for {
1364+
select {
1365+
case log, ok := <-logsChan:
1366+
if !ok {
1367+
return
1368+
}
1369+
1370+
t.Log(log.String())
1371+
1372+
for expectedLog, found := range expectedLogs {
1373+
if found {
1374+
continue
1375+
}
1376+
if log.Tag != expectedLog.tag {
1377+
continue
1378+
}
1379+
if strings.Contains(log.Message, expectedLog.message) {
1380+
expectedLogs[expectedLog] = true
1381+
}
1382+
}
1383+
}
1384+
}
1385+
}()
1386+
1387+
<-time.After(time.Second * 5)
1388+
1389+
for expectedLog, found := range expectedLogs {
1390+
if !found {
1391+
t.Errorf(
1392+
"Expected to find log with tag `%s' and message containing `%s',"+
1393+
" but didn't find any.",
1394+
expectedLog.tag,
1395+
expectedLog.message)
1396+
}
1397+
}
1398+
}

0 commit comments

Comments
 (0)