Skip to content

Commit 5d9df23

Browse files
authored
provide error describing admin api (#445)
1 parent 2b49e89 commit 5d9df23

File tree

4 files changed

+59
-0
lines changed

4 files changed

+59
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## 0.16.0 (Unreleased)
44
- **[Feature]** Oauthbearer token refresh callback (bruce-szalwinski-he)
5+
- [Enhancement] Provide `Rrdkafka::Admin#describe_errors` to get errors descriptions (mensfeld)
56
- [Enhancement] Replace time poll based wait engine with an event based to improve response times on blocking operations and wait (nijikon + mensfeld)
67
- [Enhancement] Allow for usage of the second regex engine of librdkafka by setting `RDKAFKA_DISABLE_REGEX_EXT` during build (mensfeld)
78
- [Enhancement] name polling Thread as `rdkafka.native_kafka#<name>` (nijikon)

lib/rdkafka/admin.rb

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,50 @@ module Rdkafka
44
class Admin
55
include Helpers::OAuth
66

7+
class << self
8+
# Allows us to retrieve librdkafka errors with descriptions
9+
# Useful for debugging and building UIs, etc.
10+
#
11+
# @return [Hash<Integer, Hash>] hash with errors mapped by code
12+
def describe_errors
13+
# Memory pointers for the array of structures and count
14+
p_error_descs = FFI::MemoryPointer.new(:pointer)
15+
p_count = FFI::MemoryPointer.new(:size_t)
16+
17+
# Call the attached function
18+
Bindings.rd_kafka_get_err_descs(p_error_descs, p_count)
19+
20+
# Retrieve the number of items in the array
21+
count = p_count.read_uint
22+
23+
# Get the pointer to the array of error descriptions
24+
array_of_errors = FFI::Pointer.new(Bindings::NativeErrorDesc, p_error_descs.read_pointer)
25+
26+
errors = {}
27+
28+
count.times do |i|
29+
# Get the pointer to each struct
30+
error_ptr = array_of_errors[i]
31+
32+
# Create a new instance of NativeErrorDesc for each item
33+
error_desc = Bindings::NativeErrorDesc.new(error_ptr)
34+
35+
# Read values from the struct
36+
code = error_desc[:code]
37+
38+
name = ''
39+
desc = ''
40+
41+
name = error_desc[:name].read_string unless error_desc[:name].null?
42+
desc = error_desc[:desc].read_string unless error_desc[:desc].null?
43+
44+
errors[code] = { code: code, name: name, description: desc }
45+
end
46+
47+
errors
48+
end
49+
end
50+
751
# @private
852
def initialize(native_kafka)
953
@native_kafka = native_kafka

lib/rdkafka/bindings.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,15 @@ class TopicPartitionList < FFI::Struct
9090
attach_function :rd_kafka_topic_partition_list_copy, [:pointer], :pointer
9191

9292
# Errors
93+
class NativeErrorDesc < FFI::Struct
94+
layout :code, :int,
95+
:name, :pointer,
96+
:desc, :pointer
97+
end
9398

9499
attach_function :rd_kafka_err2name, [:int], :string
95100
attach_function :rd_kafka_err2str, [:int], :string
101+
attach_function :rd_kafka_get_err_descs, [:pointer, :pointer], :void
96102

97103
# Configuration
98104

spec/rdkafka/admin_spec.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@
3131
let(:operation) {Rdkafka::Bindings::RD_KAFKA_ACL_OPERATION_READ}
3232
let(:permission_type) {Rdkafka::Bindings::RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW}
3333

34+
describe '#describe_errors' do
35+
let(:errors) { admin.class.describe_errors }
36+
37+
it { expect(errors.size).to eq(162) }
38+
it { expect(errors[-184]).to eq(code: -184, description: 'Local: Queue full', name: '_QUEUE_FULL') }
39+
it { expect(errors[21]).to eq(code: 21, description: 'Broker: Invalid required acks value', name: 'INVALID_REQUIRED_ACKS') }
40+
end
41+
3442
describe 'admin without auto-start' do
3543
let(:admin) { config.admin(native_kafka_auto_start: false) }
3644

0 commit comments

Comments
 (0)