Skip to content

Commit 1600315

Browse files
committed
main merge
2 parents f0b1477 + 4eb360f commit 1600315

15 files changed

+872
-17
lines changed

CHANGELOG.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
# Rdkafka Changelog
22

3-
## 0.15.2 (Unreleased)
3+
## 0.16.0 (Unreleased)
4+
- **[Feature]** Support incremental config describe + alter API.
45
- **[Feature]** Oauthbearer token refresh callback (bruce-szalwinski-he)
56
- **[Feature]** Provide ability to set topic config on a producer for custom behaviors.
67
- [Enhancement] Use topic config reference cache for messages production to prevent topic objects allocation with each message.
8+
- [Enhancement] Provide `Rrdkafka::Admin#describe_errors` to get errors descriptions (mensfeld)
79
- [Enhancement] Replace time poll based wait engine with an event based to improve response times on blocking operations and wait (nijikon + mensfeld)
810
- [Enhancement] Allow for usage of the second regex engine of librdkafka by setting `RDKAFKA_DISABLE_REGEX_EXT` during build (mensfeld)
11+
- [Enhancement] name polling Thread as `rdkafka.native_kafka#<name>` (nijikon)
912
- [Change] Allow for native kafka thread operations deferring and manual start for consumer, producer and admin.
1013
- [Change] The `wait_timeout` argument in `AbstractHandle.wait` method is deprecated and will be removed in future versions without replacement. We don't rely on it's value anymore (nijikon)
1114
- [Fix] Background logger stops working after forking causing memory leaks (mensfeld)
12-
- [Enhancement] name polling Thread as `rdkafka.native_kafka#<name>` (nijikon)
15+
- [Fix] Fix bogus case/when syntax. Levels 1, 2, and 6 previously defaulted to UNKNOWN (jjowdy)
1316

1417
## 0.15.1 (2024-01-30)
1518
- [Enhancement] Provide support for Nix OS (alexandriainfantino)

lib/rdkafka.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@
2424
require "rdkafka/admin/delete_acl_report"
2525
require "rdkafka/admin/describe_acl_handle"
2626
require "rdkafka/admin/describe_acl_report"
27+
require "rdkafka/admin/describe_configs_handle"
28+
require "rdkafka/admin/describe_configs_report"
29+
require "rdkafka/admin/incremental_alter_configs_handle"
30+
require "rdkafka/admin/incremental_alter_configs_report"
2731
require "rdkafka/admin/acl_binding_result"
32+
require "rdkafka/admin/config_binding_result"
33+
require "rdkafka/admin/config_resource_binding_result"
2834
require "rdkafka/bindings"
2935
require "rdkafka/callbacks"
3036
require "rdkafka/config"

lib/rdkafka/admin.rb

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,50 @@ module Rdkafka
44
class Admin
55
include Helpers::OAuth
66

7+
class << self
8+
# Allows us to retrieve librdkafka errors with descriptions
9+
# Useful for debugging and building UIs, etc.
10+
#
11+
# @return [Hash<Integer, Hash>] hash with errors mapped by code
12+
def describe_errors
13+
# Memory pointers for the array of structures and count
14+
p_error_descs = FFI::MemoryPointer.new(:pointer)
15+
p_count = FFI::MemoryPointer.new(:size_t)
16+
17+
# Call the attached function
18+
Bindings.rd_kafka_get_err_descs(p_error_descs, p_count)
19+
20+
# Retrieve the number of items in the array
21+
count = p_count.read_uint
22+
23+
# Get the pointer to the array of error descriptions
24+
array_of_errors = FFI::Pointer.new(Bindings::NativeErrorDesc, p_error_descs.read_pointer)
25+
26+
errors = {}
27+
28+
count.times do |i|
29+
# Get the pointer to each struct
30+
error_ptr = array_of_errors[i]
31+
32+
# Create a new instance of NativeErrorDesc for each item
33+
error_desc = Bindings::NativeErrorDesc.new(error_ptr)
34+
35+
# Read values from the struct
36+
code = error_desc[:code]
37+
38+
name = ''
39+
desc = ''
40+
41+
name = error_desc[:name].read_string unless error_desc[:name].null?
42+
desc = error_desc[:desc].read_string unless error_desc[:desc].null?
43+
44+
errors[code] = { code: code, name: name, description: desc }
45+
end
46+
47+
errors
48+
end
49+
end
50+
751
# @private
852
def initialize(native_kafka)
953
@native_kafka = native_kafka
@@ -620,6 +664,166 @@ def describe_acl(resource_type:, resource_name:, resource_pattern_type:, princip
620664
describe_acl_handle
621665
end
622666

667+
668+
# Describe configs
669+
#
670+
# @param resources [Array<Hash>] Array where elements are hashes with two keys:
671+
# - `:resource_type` - numerical resource type based on Kafka API
672+
# - `:resource_name` - string with resource name
673+
# @return [DescribeConfigsHandle] Describe config handle that can be used to wait for the
674+
# result of fetching resources with their appropriate configs
675+
#
676+
# @raise [RdkafkaError]
677+
#
678+
# @note Several resources can be requested at one go, but only one broker at a time
679+
def describe_configs(resources)
680+
closed_admin_check(__method__)
681+
682+
handle = DescribeConfigsHandle.new
683+
handle[:pending] = true
684+
handle[:response] = -1
685+
686+
queue_ptr = @native_kafka.with_inner do |inner|
687+
Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
688+
end
689+
690+
if queue_ptr.null?
691+
raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
692+
end
693+
694+
admin_options_ptr = @native_kafka.with_inner do |inner|
695+
Rdkafka::Bindings.rd_kafka_AdminOptions_new(
696+
inner,
697+
Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS
698+
)
699+
end
700+
701+
DescribeConfigsHandle.register(handle)
702+
Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, handle.to_ptr)
703+
704+
pointer_array = resources.map do |resource_details|
705+
Rdkafka::Bindings.rd_kafka_ConfigResource_new(
706+
resource_details.fetch(:resource_type),
707+
FFI::MemoryPointer.from_string(
708+
resource_details.fetch(:resource_name)
709+
)
710+
)
711+
end
712+
713+
configs_array_ptr = FFI::MemoryPointer.new(:pointer, pointer_array.size)
714+
configs_array_ptr.write_array_of_pointer(pointer_array)
715+
716+
begin
717+
@native_kafka.with_inner do |inner|
718+
Rdkafka::Bindings.rd_kafka_DescribeConfigs(
719+
inner,
720+
configs_array_ptr,
721+
pointer_array.size,
722+
admin_options_ptr,
723+
queue_ptr
724+
)
725+
end
726+
rescue Exception
727+
DescribeConfigsHandle.remove(handle.to_ptr.address)
728+
729+
raise
730+
ensure
731+
Rdkafka::Bindings.rd_kafka_ConfigResource_destroy_array(
732+
configs_array_ptr,
733+
pointer_array.size
734+
) if configs_array_ptr
735+
end
736+
737+
handle
738+
end
739+
740+
# Alters in an incremental way all the configs provided for given resources
741+
#
742+
# @param resources_with_configs [Array<Hash>] resources with the configs key that contains
743+
# name, value and the proper op_type to perform on this value.
744+
#
745+
# @return [IncrementalAlterConfigsHandle] Incremental alter configs handle that can be used to
746+
# wait for the result of altering resources with their appropriate configs
747+
#
748+
# @raise [RdkafkaError]
749+
#
750+
# @note Several resources can be requested at one go, but only one broker at a time
751+
# @note The results won't contain altered values but only the altered resources
752+
def incremental_alter_configs(resources_with_configs)
753+
closed_admin_check(__method__)
754+
755+
handle = IncrementalAlterConfigsHandle.new
756+
handle[:pending] = true
757+
handle[:response] = -1
758+
759+
queue_ptr = @native_kafka.with_inner do |inner|
760+
Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
761+
end
762+
763+
if queue_ptr.null?
764+
raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
765+
end
766+
767+
admin_options_ptr = @native_kafka.with_inner do |inner|
768+
Rdkafka::Bindings.rd_kafka_AdminOptions_new(
769+
inner,
770+
Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS
771+
)
772+
end
773+
774+
IncrementalAlterConfigsHandle.register(handle)
775+
Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, handle.to_ptr)
776+
777+
# Tu poprawnie tworzyc
778+
pointer_array = resources_with_configs.map do |resource_details|
779+
# First build the appropriate resource representation
780+
resource_ptr = Rdkafka::Bindings.rd_kafka_ConfigResource_new(
781+
resource_details.fetch(:resource_type),
782+
FFI::MemoryPointer.from_string(
783+
resource_details.fetch(:resource_name)
784+
)
785+
)
786+
787+
resource_details.fetch(:configs).each do |config|
788+
Bindings.rd_kafka_ConfigResource_add_incremental_config(
789+
resource_ptr,
790+
config.fetch(:name),
791+
config.fetch(:op_type),
792+
config.fetch(:value)
793+
)
794+
end
795+
796+
resource_ptr
797+
end
798+
799+
configs_array_ptr = FFI::MemoryPointer.new(:pointer, pointer_array.size)
800+
configs_array_ptr.write_array_of_pointer(pointer_array)
801+
802+
803+
begin
804+
@native_kafka.with_inner do |inner|
805+
Rdkafka::Bindings.rd_kafka_IncrementalAlterConfigs(
806+
inner,
807+
configs_array_ptr,
808+
pointer_array.size,
809+
admin_options_ptr,
810+
queue_ptr
811+
)
812+
end
813+
rescue Exception
814+
IncrementalAlterConfigsHandle.remove(handle.to_ptr.address)
815+
816+
raise
817+
ensure
818+
Rdkafka::Bindings.rd_kafka_ConfigResource_destroy_array(
819+
configs_array_ptr,
820+
pointer_array.size
821+
) if configs_array_ptr
822+
end
823+
824+
handle
825+
end
826+
623827
private
624828

625829
def closed_admin_check(method)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Admin
5+
# A single config binding result that represents its values extracted from C
6+
class ConfigBindingResult
7+
attr_reader :name, :value, :read_only, :default, :sensitive, :synonym, :synonyms
8+
9+
# @param config_ptr [FFI::Pointer] config pointer
10+
def initialize(config_ptr)
11+
@name = Bindings.rd_kafka_ConfigEntry_name(config_ptr)
12+
@value = Bindings.rd_kafka_ConfigEntry_value(config_ptr)
13+
@read_only = Bindings.rd_kafka_ConfigEntry_is_read_only(config_ptr)
14+
@default = Bindings.rd_kafka_ConfigEntry_is_default(config_ptr)
15+
@sensitive = Bindings.rd_kafka_ConfigEntry_is_sensitive(config_ptr)
16+
@synonym = Bindings.rd_kafka_ConfigEntry_is_synonym(config_ptr)
17+
@synonyms = []
18+
19+
# The code below builds up the config synonyms using same config binding
20+
pointer_to_size_t = FFI::MemoryPointer.new(:int32)
21+
synonym_ptr = Bindings.rd_kafka_ConfigEntry_synonyms(config_ptr, pointer_to_size_t)
22+
synonyms_ptr = synonym_ptr.read_array_of_pointer(pointer_to_size_t.read_int)
23+
24+
(1..pointer_to_size_t.read_int).map do |ar|
25+
self.class.new synonyms_ptr[ar - 1]
26+
end
27+
end
28+
end
29+
end
30+
end
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Admin
5+
# A simple binding that represents the requested config resource
6+
class ConfigResourceBindingResult
7+
attr_reader :name, :type, :configs, :configs_count
8+
9+
def initialize(config_resource_ptr)
10+
ffi_binding = Bindings::ConfigResource.new(config_resource_ptr)
11+
12+
@name = ffi_binding[:name]
13+
@type = ffi_binding[:type]
14+
@configs = []
15+
end
16+
end
17+
end
18+
end

lib/rdkafka/admin/describe_acl_report.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ class DescribeAclReport
1010

1111
def initialize(acls:, acls_count:)
1212
@acls=[]
13+
1314
if acls != FFI::Pointer::NULL
1415
acl_binding_result_pointers = acls.read_array_of_pointer(acls_count)
1516
(1..acls_count).map do |acl_index|
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Admin
5+
class DescribeConfigsHandle < AbstractHandle
6+
layout :pending, :bool,
7+
:response, :int,
8+
:response_string, :pointer,
9+
:config_entries, :pointer,
10+
:entry_count, :int
11+
12+
# @return [String] the name of the operation.
13+
def operation_name
14+
"describe configs"
15+
end
16+
17+
# @return [DescribeAclReport] instance with an array of acls that matches the request filters.
18+
def create_result
19+
DescribeConfigsReport.new(
20+
config_entries: self[:config_entries],
21+
entry_count: self[:entry_count]
22+
)
23+
end
24+
25+
def raise_error
26+
raise RdkafkaError.new(
27+
self[:response],
28+
broker_message: self[:response_string].read_string
29+
)
30+
end
31+
end
32+
end
33+
end
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Admin
5+
class DescribeConfigsReport
6+
attr_reader :resources
7+
8+
def initialize(config_entries:, entry_count:)
9+
@resources=[]
10+
11+
return if config_entries == FFI::Pointer::NULL
12+
13+
config_entries
14+
.read_array_of_pointer(entry_count)
15+
.each { |config_resource_result_ptr| validate!(config_resource_result_ptr) }
16+
.each do |config_resource_result_ptr|
17+
config_resource_result = ConfigResourceBindingResult.new(config_resource_result_ptr)
18+
19+
pointer_to_size_t = FFI::MemoryPointer.new(:int32)
20+
configs_ptr = Bindings.rd_kafka_ConfigResource_configs(
21+
config_resource_result_ptr,
22+
pointer_to_size_t
23+
)
24+
25+
configs_ptr
26+
.read_array_of_pointer(pointer_to_size_t.read_int)
27+
.map { |config_ptr| ConfigBindingResult.new(config_ptr) }
28+
.each { |config_binding| config_resource_result.configs << config_binding }
29+
30+
@resources << config_resource_result
31+
end
32+
ensure
33+
return if config_entries == FFI::Pointer::NULL
34+
35+
Bindings.rd_kafka_ConfigResource_destroy_array(config_entries, entry_count)
36+
end
37+
38+
private
39+
40+
def validate!(config_resource_result_ptr)
41+
code = Bindings.rd_kafka_ConfigResource_error(config_resource_result_ptr)
42+
43+
return if code.zero?
44+
45+
raise(
46+
RdkafkaError.new(
47+
code,
48+
Bindings.rd_kafka_ConfigResource_error_string(config_resource_result_ptr)
49+
)
50+
)
51+
end
52+
end
53+
end
54+
end

0 commit comments

Comments
 (0)