Skip to content

Add log channel to admin client #1448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

k-raina
Copy link
Member

@k-raina k-raina commented Jun 27, 2025

Summary

This PR adds support for forwarding librdkafka log events to a Go channel in the AdminClient, similar to the existing functionality in the Producer and Consumer clients. This allows users to programmatically consume and process log events generated by the underlying librdkafka instance used by the AdminClient.

Motivation

Changes

  • AdminClient struct: Added an adminTermChan field to manage the lifecycle of the log polling goroutine.
  • NewAdminClient:
    • Accepts go.logs.channel.enable and go.logs.channel in the ConfigMap.
    • If enabled, sets up a log channel and starts a goroutine to forward log events.
  • Close method: Ensures the log polling goroutine is properly terminated when the AdminClient is closed.

Test

Unit Test
  • Added a test to verify that log events are received as expected.
Manual test
  • Test Code:
func main() {
	_, version := kafka.LibraryVersion()
	fmt.Printf("librdkafka version: %s\n", version)

	logsChan := make(chan kafka.LogEvent, 100)

	admin, err := kafka.NewAdminClient(&kafka.ConfigMap{
		"bootstrap.servers":      "localhost:9092", // Change as needed
		"debug":                  "all",
		"go.logs.channel.enable": true,
		"go.logs.channel":        logsChan,
	})
	if err != nil {
		fmt.Printf("Failed to create AdminClient: %v\n", err)
		return
	}
	defer admin.Close()

	fmt.Println("AdminClient created. Listening for log events... (Ctrl+C to exit)")

	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, os.Interrupt)

	go func() {
		for log := range logsChan {
			fmt.Printf("%s INFO kafka admin - LogEvent {\n", log.Timestamp.Format("15:04:05.00000"))
			fmt.Printf("  Name: '%s',\n", log.Name)
			fmt.Printf("  Tag: '%s',\n", log.Tag)
			fmt.Printf("  Message: '%s',\n", log.Message)
			fmt.Printf("  Level: %d,\n", log.Level)
			fmt.Printf("  Timestamp: Time {\n")
			fmt.Printf("   wall: %d,\n", log.Timestamp.UnixNano())
			fmt.Printf("   ext: %d,\n", log.Timestamp.Unix())
			fmt.Printf("   loc: %s\n", log.Timestamp.Location())
			fmt.Printf("  }\n")
			fmt.Printf(" } \n\n")
		}
	}()

	// Optionally, perform an admin operation to generate some logs
	_, _ = admin.GetMetadata(nil, true, 1000)

	<-sigchan
	fmt.Println("Exiting...")
} 
  • Output
15:57:12.20329 INFO kafka admin - LogEvent {
  Name: 'rdkafka#producer-1',
  Tag: 'SEND',
  Message: '[thrd:localhost:9092/1]: localhost:9092/1: Sent MetadataRequest (v12, 26 bytes @ 0, CorrId 2)',
  Level: 7,
  Timestamp: Time {
   wall: 1751020032203293000,
   ext: 1751020032,
   loc: Local
  }
 } 

15:57:12.20335 INFO kafka admin - LogEvent {
  Name: 'rdkafka#producer-1',
  Tag: 'METADATA',
  Message: '[thrd:app]: localhost:9092/1: Request metadata for all topics: application requested',
  Level: 7,
  Timestamp: Time {
   wall: 1751020032203356000,
   ext: 1751020032,
   loc: Local
  }
 } 

15:57:12.20341 INFO kafka admin - LogEvent {
  Name: 'rdkafka#producer-1',
  Tag: 'SETBROKER',
  Message: '[thrd:main]: Setting telemetry broker to localhost:9092/1
',
  Level: 7,
  Timestamp: Time {
   wall: 1751020032203415000,
   ext: 1751020032,
   loc: Local
  }
 } 

15:57:12.20347 INFO kafka admin - LogEvent {
  Name: 'rdkafka#producer-1',
  Tag: 'SEND',
  Message: '[thrd:localhost:9092/1]: localhost:9092/1: Sent MetadataRequest (v12, 29 bytes @ 0, CorrId 3)',
  Level: 7,
  Timestamp: Time {
   wall: 1751020032203475000,
   ext: 1751020032,
   loc: Local
  }
 } 

15:57:12.20351 INFO kafka admin - LogEvent {
  Name: 'rdkafka#producer-1',
  Tag: 'GETSUBSCRIPTIONS',
  Message: '[thrd:main]: Sending GetTelemetryRequest',
  Level: 7,
  Timestamp: Time {
   wall: 1751020032203517000,
   ext: 1751020032,
   loc: Local
  }
 } 

@Copilot Copilot AI review requested due to automatic review settings June 27, 2025 10:34
@k-raina k-raina requested review from a team as code owners June 27, 2025 10:34
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for forwarding librdkafka log events to a Go channel in the AdminClient, enabling programmatic log consumption for improved observability.

  • Adds a log channel setup in NewAdminClient and cleans it up in Close.
  • Introduces a new unit test (TestAdminClientLog) verifying the log event forwarding functionality.

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
kafka/adminapi_test.go Added unit test to validate log event delivery.
kafka/adminapi.go Added support for log channel configuration and termination.
Comments suppressed due to low confidence (1)

kafka/adminapi.go:3733

  • [nitpick] Consider using a channel of type struct{} for termination signals instead of bool to better convey that only a signal is needed.
	// before we do anything with the configuration, create a copy such that

{"INIT", "librdkafka"}: false,
}

go func() {
Copy link
Preview

Copilot AI Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider adding an explicit termination condition in the goroutine reading from logsChan to avoid potential goroutine leaks when no more log events are sent. For example, signal closure of logsChan when the test completes.

Copilot uses AI. Check for mistakes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in acdb44b

@k-raina k-raina self-assigned this Jun 27, 2025
Copy link
Contributor

@milindl milindl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one change, and add a CHANGELOG.md entry, keep the version of the entry as 2.12 for now

@@ -2737,6 +2738,9 @@ func (a *AdminClient) Close() {
return
}

if a.adminTermChan != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a Logs() method onto the AdminClient, too, which returns a.handle.logs, to be used in case "go.logs.channel.enable": true, but "go.logs.channel": nil, or unset.

After this, if handle.closeLogsChan is true, then also close the logs channel here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @milindl for review
Addressed feedback in commit 7245731

@airlock-confluentinc airlock-confluentinc bot force-pushed the issue-1119-add-logchannel-adminClient branch from 7245731 to f9051be Compare July 11, 2025 13:19
@k-raina k-raina requested a review from milindl July 11, 2025 13:25
@sonarqube-confluent
Copy link

Passed

Analysis Details

3 Issues

  • Bug 0 Bugs
  • Vulnerability 0 Vulnerabilities
  • Code Smell 3 Code Smells

Coverage and Duplications

  • Coverage 93.30% Coverage (55.60% Estimated after merge)
  • Duplications No duplication information (0.10% Estimated after merge)

Project ID: confluent-kafka-go

View in SonarQube

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants