Skip to content

Commit bfdb0a2

Browse files
authored
Provide configs describe API (#447)
1 parent 5d9df23 commit bfdb0a2

11 files changed

+397
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Rdkafka Changelog
22

33
## 0.16.0 (Unreleased)
4+
- **[Feature]** Introduce ability to discover cluster and topic configuration.
45
- **[Feature]** Oauthbearer token refresh callback (bruce-szalwinski-he)
56
- [Enhancement] Provide `Rrdkafka::Admin#describe_errors` to get errors descriptions (mensfeld)
67
- [Enhancement] Replace time poll based wait engine with an event based to improve response times on blocking operations and wait (nijikon + mensfeld)

lib/rdkafka.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@
2424
require "rdkafka/admin/delete_acl_report"
2525
require "rdkafka/admin/describe_acl_handle"
2626
require "rdkafka/admin/describe_acl_report"
27+
require "rdkafka/admin/describe_configs_handle"
28+
require "rdkafka/admin/describe_configs_report"
2729
require "rdkafka/admin/acl_binding_result"
30+
require "rdkafka/admin/config_binding_result"
31+
require "rdkafka/admin/config_resource_binding_result"
2832
require "rdkafka/bindings"
2933
require "rdkafka/callbacks"
3034
require "rdkafka/config"

lib/rdkafka/admin.rb

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,84 @@ def describe_acl(resource_type:, resource_name:, resource_pattern_type:, princip
664664
describe_acl_handle
665665
end
666666

667+
668+
# Describe configs
669+
#
670+
# @param resource_type - values of type rd_kafka_ResourceType_t that support configs
671+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7307
672+
# valid values are:
673+
# RD_KAFKA_RESOURCE_TOPIC = 2
674+
# RD_KAFKA_RESOURCE_BROKER = 4
675+
# @param resources [Array<Hash>] Array where elements are hashes with two keys:
676+
# - `:resource_type` - numerical resource type based on Kafka API
677+
# - `:resource_name` - string with resource name
678+
# @return [DescribeConfigsHandle] Describe config handle that can be used to wait for the
679+
# result of fetching resources with their appropriate configs
680+
#
681+
# @raise [RdkafkaError]
682+
#
683+
# @note Several resources can be requested at one go, but only one broker at a time
684+
def describe_configs(resources)
685+
closed_admin_check(__method__)
686+
687+
handle = DescribeConfigsHandle.new
688+
handle[:pending] = true
689+
handle[:response] = -1
690+
691+
queue_ptr = @native_kafka.with_inner do |inner|
692+
Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
693+
end
694+
695+
if queue_ptr.null?
696+
raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
697+
end
698+
699+
admin_options_ptr = @native_kafka.with_inner do |inner|
700+
Rdkafka::Bindings.rd_kafka_AdminOptions_new(
701+
inner,
702+
Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS
703+
)
704+
end
705+
706+
DescribeConfigsHandle.register(handle)
707+
Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, handle.to_ptr)
708+
709+
pointer_array = resources.map do |resource_details|
710+
Rdkafka::Bindings.rd_kafka_ConfigResource_new(
711+
resource_details.fetch(:resource_type),
712+
FFI::MemoryPointer.from_string(
713+
resource_details.fetch(:resource_name)
714+
)
715+
)
716+
end
717+
718+
configs_array_ptr = FFI::MemoryPointer.new(:pointer, pointer_array.size)
719+
configs_array_ptr.write_array_of_pointer(pointer_array)
720+
721+
begin
722+
@native_kafka.with_inner do |inner|
723+
Rdkafka::Bindings.rd_kafka_DescribeConfigs(
724+
inner,
725+
configs_array_ptr,
726+
pointer_array.size,
727+
admin_options_ptr,
728+
queue_ptr
729+
)
730+
end
731+
rescue Exception
732+
DescribeConfigsHandle.remove(handle.to_ptr.address)
733+
734+
raise
735+
ensure
736+
Rdkafka::Bindings.rd_kafka_ConfigResource_destroy_array(
737+
configs_array_ptr,
738+
pointer_array.size
739+
) if configs_array_ptr
740+
end
741+
742+
handle
743+
end
744+
667745
private
668746

669747
def closed_admin_check(method)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Admin
5+
# A single config binding result that represents its values extracted from C
6+
class ConfigBindingResult
7+
attr_reader :name, :value, :read_only, :default, :sensitive, :synonym, :synonyms
8+
9+
# @param config_ptr [FFI::Pointer] config pointer
10+
def initialize(config_ptr)
11+
@name = Bindings.rd_kafka_ConfigEntry_name(config_ptr)
12+
@value = Bindings.rd_kafka_ConfigEntry_value(config_ptr)
13+
@read_only = Bindings.rd_kafka_ConfigEntry_is_read_only(config_ptr)
14+
@default = Bindings.rd_kafka_ConfigEntry_is_default(config_ptr)
15+
@sensitive = Bindings.rd_kafka_ConfigEntry_is_sensitive(config_ptr)
16+
@synonym = Bindings.rd_kafka_ConfigEntry_is_synonym(config_ptr)
17+
@synonyms = []
18+
19+
# The code below builds up the config synonyms using same config binding
20+
pointer_to_size_t = FFI::MemoryPointer.new(:int32)
21+
synonym_ptr = Bindings.rd_kafka_ConfigEntry_synonyms(config_ptr, pointer_to_size_t)
22+
synonyms_ptr = synonym_ptr.read_array_of_pointer(pointer_to_size_t.read_int)
23+
24+
(1..pointer_to_size_t.read_int).map do |ar|
25+
self.class.new synonyms_ptr[ar - 1]
26+
end
27+
end
28+
end
29+
end
30+
end
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Admin
5+
# A simple binding that represents the requested config resource
6+
class ConfigResourceBindingResult
7+
attr_reader :name, :type, :configs, :configs_count
8+
9+
def initialize(config_resource_ptr)
10+
ffi_binding = Bindings::ConfigResource.new(config_resource_ptr)
11+
12+
@name = ffi_binding[:name]
13+
@type = ffi_binding[:type]
14+
@configs = []
15+
end
16+
end
17+
end
18+
end

lib/rdkafka/admin/describe_acl_report.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ class DescribeAclReport
1010

1111
def initialize(acls:, acls_count:)
1212
@acls=[]
13+
1314
if acls != FFI::Pointer::NULL
1415
acl_binding_result_pointers = acls.read_array_of_pointer(acls_count)
1516
(1..acls_count).map do |acl_index|
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Admin
5+
class DescribeConfigsHandle < AbstractHandle
6+
layout :pending, :bool,
7+
:response, :int,
8+
:response_string, :pointer,
9+
:config_entries, :pointer,
10+
:entry_count, :int
11+
12+
# @return [String] the name of the operation.
13+
def operation_name
14+
"describe configs"
15+
end
16+
17+
# @return [DescribeAclReport] instance with an array of acls that matches the request filters.
18+
def create_result
19+
DescribeConfigsReport.new(
20+
config_entries: self[:config_entries],
21+
entry_count: self[:entry_count]
22+
)
23+
end
24+
25+
def raise_error
26+
raise RdkafkaError.new(
27+
self[:response],
28+
broker_message: self[:response_string].read_string
29+
)
30+
end
31+
end
32+
end
33+
end
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Admin
5+
class DescribeConfigsReport
6+
attr_reader :resources
7+
8+
def initialize(config_entries:, entry_count:)
9+
@resources=[]
10+
11+
return if config_entries == FFI::Pointer::NULL
12+
13+
config_entries
14+
.read_array_of_pointer(entry_count)
15+
.each { |config_resource_result_ptr| validate!(config_resource_result_ptr) }
16+
.each do |config_resource_result_ptr|
17+
config_resource_result = ConfigResourceBindingResult.new(config_resource_result_ptr)
18+
19+
pointer_to_size_t = FFI::MemoryPointer.new(:int32)
20+
configs_ptr = Bindings.rd_kafka_ConfigResource_configs(
21+
config_resource_result_ptr,
22+
pointer_to_size_t
23+
)
24+
25+
configs_ptr
26+
.read_array_of_pointer(pointer_to_size_t.read_int)
27+
.map { |config_ptr| ConfigBindingResult.new(config_ptr) }
28+
.each { |config_binding| config_resource_result.configs << config_binding }
29+
30+
@resources << config_resource_result
31+
end
32+
ensure
33+
return if config_entries == FFI::Pointer::NULL
34+
35+
Bindings.rd_kafka_ConfigResource_destroy_array(config_entries, entry_count)
36+
end
37+
38+
private
39+
40+
def validate!(config_resource_result_ptr)
41+
code = Bindings.rd_kafka_ConfigResource_error(config_resource_result_ptr)
42+
43+
return if code.zero?
44+
45+
raise(
46+
RdkafkaError.new(
47+
code,
48+
nil,
49+
broker_message: Bindings.rd_kafka_ConfigResource_error_string(config_resource_result_ptr)
50+
)
51+
)
52+
end
53+
end
54+
end
55+
end

lib/rdkafka/bindings.rb

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,36 @@ class TopicPartitionList < FFI::Struct
8989
attach_function :rd_kafka_topic_partition_list_destroy, [:pointer], :void
9090
attach_function :rd_kafka_topic_partition_list_copy, [:pointer], :pointer
9191

92+
# Configs management
93+
#
94+
# Structs for management of configurations
95+
# Each configuration is attached to a resource and one resource can have many configuration
96+
# details. Each resource will also have separate errors results if obtaining configuration
97+
# was not possible for any reason
98+
class ConfigResource < FFI::Struct
99+
layout :type, :int,
100+
:name, :string
101+
end
102+
103+
attach_function :rd_kafka_DescribeConfigs, [:pointer, :pointer, :size_t, :pointer, :pointer], :void, blocking: true
104+
attach_function :rd_kafka_ConfigResource_new, [:int32, :pointer], :pointer
105+
attach_function :rd_kafka_ConfigResource_destroy_array, [:pointer, :int32], :void
106+
attach_function :rd_kafka_event_DescribeConfigs_result, [:pointer], :pointer
107+
attach_function :rd_kafka_DescribeConfigs_result_resources, [:pointer, :pointer], :pointer
108+
attach_function :rd_kafka_ConfigResource_configs, [:pointer, :pointer], :pointer
109+
attach_function :rd_kafka_ConfigEntry_name, [:pointer], :string
110+
attach_function :rd_kafka_ConfigEntry_value, [:pointer], :string
111+
attach_function :rd_kafka_ConfigEntry_is_read_only, [:pointer], :int
112+
attach_function :rd_kafka_ConfigEntry_is_default, [:pointer], :int
113+
attach_function :rd_kafka_ConfigEntry_is_sensitive, [:pointer], :int
114+
attach_function :rd_kafka_ConfigEntry_is_synonym, [:pointer], :int
115+
attach_function :rd_kafka_ConfigEntry_synonyms, [:pointer, :pointer], :pointer
116+
attach_function :rd_kafka_ConfigResource_error, [:pointer], :int
117+
attach_function :rd_kafka_ConfigResource_error_string, [:pointer], :string
118+
119+
RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS = 5
120+
RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT = 104
121+
92122
# Errors
93123
class NativeErrorDesc < FFI::Struct
94124
layout :code, :int,

lib/rdkafka/callbacks.rb

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,24 @@ def initialize(event_ptr)
113113
end
114114
end
115115

116+
class DescribeConfigResult
117+
attr_reader :result_error, :error_string, :results, :results_count
118+
119+
def initialize(event_ptr)
120+
@results=[]
121+
@result_error = Rdkafka::Bindings.rd_kafka_event_error(event_ptr)
122+
@error_string = Rdkafka::Bindings.rd_kafka_event_error_string(event_ptr)
123+
124+
if @result_error == 0
125+
acl_describe_result = Rdkafka::Bindings.rd_kafka_event_DescribeConfigs_result(event_ptr)
126+
# Get the number of matching acls
127+
pointer_to_size_t = FFI::MemoryPointer.new(:int32)
128+
@results = Rdkafka::Bindings.rd_kafka_DescribeConfigs_result_resources(acl_describe_result, pointer_to_size_t)
129+
@results_count = pointer_to_size_t.read_int
130+
end
131+
end
132+
end
133+
116134
# FFI Function used for Create Topic and Delete Topic callbacks
117135
BackgroundEventCallbackFunction = FFI::Function.new(
118136
:void, [:pointer, :pointer, :pointer]
@@ -123,20 +141,22 @@ def initialize(event_ptr)
123141
# @private
124142
class BackgroundEventCallback
125143
def self.call(_, event_ptr, _)
126-
event_type = Rdkafka::Bindings.rd_kafka_event_type(event_ptr)
127-
if event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_CREATETOPICS_RESULT
144+
case Rdkafka::Bindings.rd_kafka_event_type(event_ptr)
145+
when Rdkafka::Bindings::RD_KAFKA_EVENT_CREATETOPICS_RESULT
128146
process_create_topic(event_ptr)
129-
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT
147+
when Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT
148+
process_describe_configs(event_ptr)
149+
when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT
130150
process_delete_topic(event_ptr)
131-
elsif event_type == Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT
151+
when Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT
132152
process_create_partitions(event_ptr)
133-
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_CREATEACLS_RESULT
153+
when Rdkafka::Bindings::RD_KAFKA_EVENT_CREATEACLS_RESULT
134154
process_create_acl(event_ptr)
135-
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEACLS_RESULT
155+
when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEACLS_RESULT
136156
process_delete_acl(event_ptr)
137-
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBEACLS_RESULT
157+
when Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBEACLS_RESULT
138158
process_describe_acl(event_ptr)
139-
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEGROUPS_RESULT
159+
when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEGROUPS_RESULT
140160
process_delete_groups(event_ptr)
141161
end
142162
end
@@ -161,6 +181,24 @@ def self.process_create_topic(event_ptr)
161181
end
162182
end
163183

184+
def self.process_describe_configs(event_ptr)
185+
describe_configs = DescribeConfigResult.new(event_ptr)
186+
describe_configs_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)
187+
188+
if describe_configs_handle = Rdkafka::Admin::DescribeConfigsHandle.remove(describe_configs_handle_ptr.address)
189+
describe_configs_handle[:response] = describe_configs.result_error
190+
describe_configs_handle[:response_string] = describe_configs.error_string
191+
describe_configs_handle[:pending] = false
192+
193+
if describe_configs.result_error == 0
194+
describe_configs_handle[:config_entries] = describe_configs.results
195+
describe_configs_handle[:entry_count] = describe_configs.results_count
196+
end
197+
198+
describe_configs_handle.unlock
199+
end
200+
end
201+
164202
def self.process_delete_groups(event_ptr)
165203
delete_groups_result = Rdkafka::Bindings.rd_kafka_event_DeleteGroups_result(event_ptr)
166204

@@ -263,7 +301,7 @@ def self.process_describe_acl(event_ptr)
263301
describe_acl_handle[:response_string] = describe_acl.error_string
264302

265303
if describe_acl.result_error == 0
266-
describe_acl_handle[:acls] = describe_acl.matching_acls
304+
describe_acl_handle[:acls] = describe_acl.matching_acls
267305
describe_acl_handle[:acls_count] = describe_acl.matching_acls_count
268306
end
269307

0 commit comments

Comments
 (0)