Skip to content

Provide errors describing Admin API #445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 0.16.0 (Unreleased)
- **[Feature]** Oauthbearer token refresh callback (bruce-szalwinski-he)
- [Enhancement] Provide `Rrdkafka::Admin#describe_errors` to get errors descriptions (mensfeld)
- [Enhancement] Replace time poll based wait engine with an event based to improve response times on blocking operations and wait (nijikon + mensfeld)
- [Enhancement] Allow for usage of the second regex engine of librdkafka by setting `RDKAFKA_DISABLE_REGEX_EXT` during build (mensfeld)
- [Enhancement] name polling Thread as `rdkafka.native_kafka#<name>` (nijikon)
Expand Down
44 changes: 44 additions & 0 deletions lib/rdkafka/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,50 @@ module Rdkafka
class Admin
include Helpers::OAuth

class << self
# Allows us to retrieve librdkafka errors with descriptions
# Useful for debugging and building UIs, etc.
#
# @return [Hash<Integer, Hash>] hash with errors mapped by code
def describe_errors
# Memory pointers for the array of structures and count
p_error_descs = FFI::MemoryPointer.new(:pointer)
p_count = FFI::MemoryPointer.new(:size_t)

# Call the attached function
Bindings.rd_kafka_get_err_descs(p_error_descs, p_count)

# Retrieve the number of items in the array
count = p_count.read_uint

# Get the pointer to the array of error descriptions
array_of_errors = FFI::Pointer.new(Bindings::NativeErrorDesc, p_error_descs.read_pointer)

errors = {}

count.times do |i|
# Get the pointer to each struct
error_ptr = array_of_errors[i]

# Create a new instance of NativeErrorDesc for each item
error_desc = Bindings::NativeErrorDesc.new(error_ptr)

# Read values from the struct
code = error_desc[:code]

name = ''
desc = ''

name = error_desc[:name].read_string unless error_desc[:name].null?
desc = error_desc[:desc].read_string unless error_desc[:desc].null?

errors[code] = { code: code, name: name, description: desc }
end

errors
end
end

# @private
def initialize(native_kafka)
@native_kafka = native_kafka
Expand Down
6 changes: 6 additions & 0 deletions lib/rdkafka/bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,15 @@ class TopicPartitionList < FFI::Struct
attach_function :rd_kafka_topic_partition_list_copy, [:pointer], :pointer

# Errors
class NativeErrorDesc < FFI::Struct
layout :code, :int,
:name, :pointer,
:desc, :pointer
end

attach_function :rd_kafka_err2name, [:int], :string
attach_function :rd_kafka_err2str, [:int], :string
attach_function :rd_kafka_get_err_descs, [:pointer, :pointer], :void

# Configuration

Expand Down
8 changes: 8 additions & 0 deletions spec/rdkafka/admin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
let(:operation) {Rdkafka::Bindings::RD_KAFKA_ACL_OPERATION_READ}
let(:permission_type) {Rdkafka::Bindings::RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW}

describe '#describe_errors' do
let(:errors) { admin.class.describe_errors }

it { expect(errors.size).to eq(162) }
it { expect(errors[-184]).to eq(code: -184, description: 'Local: Queue full', name: '_QUEUE_FULL') }
it { expect(errors[21]).to eq(code: 21, description: 'Broker: Invalid required acks value', name: 'INVALID_REQUIRED_ACKS') }
end

describe 'admin without auto-start' do
let(:admin) { config.admin(native_kafka_auto_start: false) }

Expand Down