Skip to content

Provide configs describe API #447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Rdkafka Changelog

## 0.16.0 (Unreleased)
- **[Feature]** Introduce ability to discover cluster and topic configuration.
- **[Feature]** Oauthbearer token refresh callback (bruce-szalwinski-he)
- [Enhancement] Provide `Rrdkafka::Admin#describe_errors` to get errors descriptions (mensfeld)
- [Enhancement] Replace time poll based wait engine with an event based to improve response times on blocking operations and wait (nijikon + mensfeld)
Expand Down
4 changes: 4 additions & 0 deletions lib/rdkafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
require "rdkafka/admin/delete_acl_report"
require "rdkafka/admin/describe_acl_handle"
require "rdkafka/admin/describe_acl_report"
require "rdkafka/admin/describe_configs_handle"
require "rdkafka/admin/describe_configs_report"
require "rdkafka/admin/acl_binding_result"
require "rdkafka/admin/config_binding_result"
require "rdkafka/admin/config_resource_binding_result"
require "rdkafka/bindings"
require "rdkafka/callbacks"
require "rdkafka/config"
Expand Down
78 changes: 78 additions & 0 deletions lib/rdkafka/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,84 @@ def describe_acl(resource_type:, resource_name:, resource_pattern_type:, princip
describe_acl_handle
end


# Describe configs
#
# @param resource_type - values of type rd_kafka_ResourceType_t that support configs
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7307
# valid values are:
# RD_KAFKA_RESOURCE_TOPIC = 2
# RD_KAFKA_RESOURCE_BROKER = 4
# @param resources [Array<Hash>] Array where elements are hashes with two keys:
# - `:resource_type` - numerical resource type based on Kafka API
# - `:resource_name` - string with resource name
# @return [DescribeConfigsHandle] Describe config handle that can be used to wait for the
# result of fetching resources with their appropriate configs
#
# @raise [RdkafkaError]
#
# @note Several resources can be requested at one go, but only one broker at a time
def describe_configs(resources)
closed_admin_check(__method__)

handle = DescribeConfigsHandle.new
handle[:pending] = true
handle[:response] = -1

queue_ptr = @native_kafka.with_inner do |inner|
Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
end

if queue_ptr.null?
raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
end

admin_options_ptr = @native_kafka.with_inner do |inner|
Rdkafka::Bindings.rd_kafka_AdminOptions_new(
inner,
Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS
)
end

DescribeConfigsHandle.register(handle)
Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, handle.to_ptr)

pointer_array = resources.map do |resource_details|
Rdkafka::Bindings.rd_kafka_ConfigResource_new(
resource_details.fetch(:resource_type),
FFI::MemoryPointer.from_string(
resource_details.fetch(:resource_name)
)
)
end

configs_array_ptr = FFI::MemoryPointer.new(:pointer, pointer_array.size)
configs_array_ptr.write_array_of_pointer(pointer_array)

begin
@native_kafka.with_inner do |inner|
Rdkafka::Bindings.rd_kafka_DescribeConfigs(
inner,
configs_array_ptr,
pointer_array.size,
admin_options_ptr,
queue_ptr
)
end
rescue Exception
DescribeConfigsHandle.remove(handle.to_ptr.address)

raise
ensure
Rdkafka::Bindings.rd_kafka_ConfigResource_destroy_array(
configs_array_ptr,
pointer_array.size
) if configs_array_ptr
end

handle
end

private

def closed_admin_check(method)
Expand Down
30 changes: 30 additions & 0 deletions lib/rdkafka/admin/config_binding_result.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

module Rdkafka
class Admin
# A single config binding result that represents its values extracted from C
class ConfigBindingResult
attr_reader :name, :value, :read_only, :default, :sensitive, :synonym, :synonyms

# @param config_ptr [FFI::Pointer] config pointer
def initialize(config_ptr)
@name = Bindings.rd_kafka_ConfigEntry_name(config_ptr)
@value = Bindings.rd_kafka_ConfigEntry_value(config_ptr)
@read_only = Bindings.rd_kafka_ConfigEntry_is_read_only(config_ptr)
@default = Bindings.rd_kafka_ConfigEntry_is_default(config_ptr)
@sensitive = Bindings.rd_kafka_ConfigEntry_is_sensitive(config_ptr)
@synonym = Bindings.rd_kafka_ConfigEntry_is_synonym(config_ptr)
@synonyms = []

# The code below builds up the config synonyms using same config binding
pointer_to_size_t = FFI::MemoryPointer.new(:int32)
synonym_ptr = Bindings.rd_kafka_ConfigEntry_synonyms(config_ptr, pointer_to_size_t)
synonyms_ptr = synonym_ptr.read_array_of_pointer(pointer_to_size_t.read_int)

(1..pointer_to_size_t.read_int).map do |ar|
self.class.new synonyms_ptr[ar - 1]
end
end
end
end
end
18 changes: 18 additions & 0 deletions lib/rdkafka/admin/config_resource_binding_result.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

module Rdkafka
class Admin
# A simple binding that represents the requested config resource
class ConfigResourceBindingResult
attr_reader :name, :type, :configs, :configs_count

def initialize(config_resource_ptr)
ffi_binding = Bindings::ConfigResource.new(config_resource_ptr)

@name = ffi_binding[:name]
@type = ffi_binding[:type]
@configs = []
end
end
end
end
1 change: 1 addition & 0 deletions lib/rdkafka/admin/describe_acl_report.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class DescribeAclReport

def initialize(acls:, acls_count:)
@acls=[]

if acls != FFI::Pointer::NULL
acl_binding_result_pointers = acls.read_array_of_pointer(acls_count)
(1..acls_count).map do |acl_index|
Expand Down
33 changes: 33 additions & 0 deletions lib/rdkafka/admin/describe_configs_handle.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

module Rdkafka
class Admin
class DescribeConfigsHandle < AbstractHandle
layout :pending, :bool,
:response, :int,
:response_string, :pointer,
:config_entries, :pointer,
:entry_count, :int

# @return [String] the name of the operation.
def operation_name
"describe configs"
end

# @return [DescribeAclReport] instance with an array of acls that matches the request filters.
def create_result
DescribeConfigsReport.new(
config_entries: self[:config_entries],
entry_count: self[:entry_count]
)
end

def raise_error
raise RdkafkaError.new(
self[:response],
broker_message: self[:response_string].read_string
)
end
end
end
end
55 changes: 55 additions & 0 deletions lib/rdkafka/admin/describe_configs_report.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# frozen_string_literal: true

module Rdkafka
class Admin
class DescribeConfigsReport
attr_reader :resources

def initialize(config_entries:, entry_count:)
@resources=[]

return if config_entries == FFI::Pointer::NULL

config_entries
.read_array_of_pointer(entry_count)
.each { |config_resource_result_ptr| validate!(config_resource_result_ptr) }
.each do |config_resource_result_ptr|
config_resource_result = ConfigResourceBindingResult.new(config_resource_result_ptr)

pointer_to_size_t = FFI::MemoryPointer.new(:int32)
configs_ptr = Bindings.rd_kafka_ConfigResource_configs(
config_resource_result_ptr,
pointer_to_size_t
)

configs_ptr
.read_array_of_pointer(pointer_to_size_t.read_int)
.map { |config_ptr| ConfigBindingResult.new(config_ptr) }
.each { |config_binding| config_resource_result.configs << config_binding }

@resources << config_resource_result
end
ensure
return if config_entries == FFI::Pointer::NULL

Bindings.rd_kafka_ConfigResource_destroy_array(config_entries, entry_count)
end

private

def validate!(config_resource_result_ptr)
code = Bindings.rd_kafka_ConfigResource_error(config_resource_result_ptr)

return if code.zero?

raise(
RdkafkaError.new(
code,
nil,
broker_message: Bindings.rd_kafka_ConfigResource_error_string(config_resource_result_ptr)
)
)
end
end
end
end
30 changes: 30 additions & 0 deletions lib/rdkafka/bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,36 @@ class TopicPartitionList < FFI::Struct
attach_function :rd_kafka_topic_partition_list_destroy, [:pointer], :void
attach_function :rd_kafka_topic_partition_list_copy, [:pointer], :pointer

# Configs management
#
# Structs for management of configurations
# Each configuration is attached to a resource and one resource can have many configuration
# details. Each resource will also have separate errors results if obtaining configuration
# was not possible for any reason
class ConfigResource < FFI::Struct
layout :type, :int,
:name, :string
end

attach_function :rd_kafka_DescribeConfigs, [:pointer, :pointer, :size_t, :pointer, :pointer], :void, blocking: true
attach_function :rd_kafka_ConfigResource_new, [:int32, :pointer], :pointer
attach_function :rd_kafka_ConfigResource_destroy_array, [:pointer, :int32], :void
attach_function :rd_kafka_event_DescribeConfigs_result, [:pointer], :pointer
attach_function :rd_kafka_DescribeConfigs_result_resources, [:pointer, :pointer], :pointer
attach_function :rd_kafka_ConfigResource_configs, [:pointer, :pointer], :pointer
attach_function :rd_kafka_ConfigEntry_name, [:pointer], :string
attach_function :rd_kafka_ConfigEntry_value, [:pointer], :string
attach_function :rd_kafka_ConfigEntry_is_read_only, [:pointer], :int
attach_function :rd_kafka_ConfigEntry_is_default, [:pointer], :int
attach_function :rd_kafka_ConfigEntry_is_sensitive, [:pointer], :int
attach_function :rd_kafka_ConfigEntry_is_synonym, [:pointer], :int
attach_function :rd_kafka_ConfigEntry_synonyms, [:pointer, :pointer], :pointer
attach_function :rd_kafka_ConfigResource_error, [:pointer], :int
attach_function :rd_kafka_ConfigResource_error_string, [:pointer], :string

RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS = 5
RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT = 104

# Errors
class NativeErrorDesc < FFI::Struct
layout :code, :int,
Expand Down
56 changes: 47 additions & 9 deletions lib/rdkafka/callbacks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,24 @@ def initialize(event_ptr)
end
end

class DescribeConfigResult
attr_reader :result_error, :error_string, :results, :results_count

def initialize(event_ptr)
@results=[]
@result_error = Rdkafka::Bindings.rd_kafka_event_error(event_ptr)
@error_string = Rdkafka::Bindings.rd_kafka_event_error_string(event_ptr)

if @result_error == 0
acl_describe_result = Rdkafka::Bindings.rd_kafka_event_DescribeConfigs_result(event_ptr)
# Get the number of matching acls
pointer_to_size_t = FFI::MemoryPointer.new(:int32)
@results = Rdkafka::Bindings.rd_kafka_DescribeConfigs_result_resources(acl_describe_result, pointer_to_size_t)
@results_count = pointer_to_size_t.read_int
end
end
end

# FFI Function used for Create Topic and Delete Topic callbacks
BackgroundEventCallbackFunction = FFI::Function.new(
:void, [:pointer, :pointer, :pointer]
Expand All @@ -123,20 +141,22 @@ def initialize(event_ptr)
# @private
class BackgroundEventCallback
def self.call(_, event_ptr, _)
event_type = Rdkafka::Bindings.rd_kafka_event_type(event_ptr)
if event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_CREATETOPICS_RESULT
case Rdkafka::Bindings.rd_kafka_event_type(event_ptr)
when Rdkafka::Bindings::RD_KAFKA_EVENT_CREATETOPICS_RESULT
process_create_topic(event_ptr)
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT
when Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT
process_describe_configs(event_ptr)
when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT
process_delete_topic(event_ptr)
elsif event_type == Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT
when Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT
process_create_partitions(event_ptr)
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_CREATEACLS_RESULT
when Rdkafka::Bindings::RD_KAFKA_EVENT_CREATEACLS_RESULT
process_create_acl(event_ptr)
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEACLS_RESULT
when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEACLS_RESULT
process_delete_acl(event_ptr)
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBEACLS_RESULT
when Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBEACLS_RESULT
process_describe_acl(event_ptr)
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEGROUPS_RESULT
when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEGROUPS_RESULT
process_delete_groups(event_ptr)
end
end
Expand All @@ -161,6 +181,24 @@ def self.process_create_topic(event_ptr)
end
end

def self.process_describe_configs(event_ptr)
describe_configs = DescribeConfigResult.new(event_ptr)
describe_configs_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

if describe_configs_handle = Rdkafka::Admin::DescribeConfigsHandle.remove(describe_configs_handle_ptr.address)
describe_configs_handle[:response] = describe_configs.result_error
describe_configs_handle[:response_string] = describe_configs.error_string
describe_configs_handle[:pending] = false

if describe_configs.result_error == 0
describe_configs_handle[:config_entries] = describe_configs.results
describe_configs_handle[:entry_count] = describe_configs.results_count
end

describe_configs_handle.unlock
end
end

def self.process_delete_groups(event_ptr)
delete_groups_result = Rdkafka::Bindings.rd_kafka_event_DeleteGroups_result(event_ptr)

Expand Down Expand Up @@ -263,7 +301,7 @@ def self.process_describe_acl(event_ptr)
describe_acl_handle[:response_string] = describe_acl.error_string

if describe_acl.result_error == 0
describe_acl_handle[:acls] = describe_acl.matching_acls
describe_acl_handle[:acls] = describe_acl.matching_acls
describe_acl_handle[:acls_count] = describe_acl.matching_acls_count
end

Expand Down
Loading