Skip to content

Producer support for per-topic configs #449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## 0.16.0 (Unreleased)
- **[Feature]** Support incremental config describe + alter API.
- **[Feature]** Oauthbearer token refresh callback (bruce-szalwinski-he)
- **[Feature]** Provide ability to use topic config on a producer for custom behaviors per dispatch.
- [Enhancement] Use topic config reference cache for messages production to prevent topic objects allocation with each message.
- [Enhancement] Provide `Rrdkafka::Admin#describe_errors` to get errors descriptions (mensfeld)
- [Enhancement] Replace time poll based wait engine with an event based to improve response times on blocking operations and wait (nijikon + mensfeld)
- [Enhancement] Allow for usage of the second regex engine of librdkafka by setting `RDKAFKA_DISABLE_REGEX_EXT` during build (mensfeld)
Expand Down
3 changes: 3 additions & 0 deletions lib/rdkafka/bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ class NativeErrorDesc < FFI::Struct
# Log queue
attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void
attach_function :rd_kafka_queue_get_main, [:pointer], :pointer
# Per topic configs
attach_function :rd_kafka_topic_conf_new, [], :pointer
attach_function :rd_kafka_topic_conf_set, [:pointer, :string, :string, :pointer, :int], :kafka_config_response

LogCallback = FFI::Function.new(
:void, [:pointer, :int, :string, :string]
Expand Down
89 changes: 85 additions & 4 deletions lib/rdkafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ class Producer
# Cache partitions count for 30 seconds
PARTITIONS_COUNT_TTL = 30

private_constant :PARTITIONS_COUNT_TTL
# Empty hash used as a default
EMPTY_HASH = {}.freeze

private_constant :PARTITIONS_COUNT_TTL, :EMPTY_HASH

# @private
# Returns the current delivery callback, by default this is nil.
Expand All @@ -28,6 +31,8 @@ class Producer
# @param partitioner_name [String, nil] name of the partitioner we want to use or nil to use
# the "consistent_random" default
def initialize(native_kafka, partitioner_name)
@topics_refs_map = {}
@topics_configs = {}
@native_kafka = native_kafka
@partitioner_name = partitioner_name || "consistent_random"

Expand All @@ -54,6 +59,48 @@ def initialize(native_kafka, partitioner_name)
end
end

# Sets alternative set of configuration details that can be set per topic
# @note It is not allowed to re-set the same topic config twice because of the underlying
# librdkafka caching
# @param topic [String] The topic name
# @param config [Hash] config we want to use per topic basis
# @param config_hash [Integer] hash of the config. We expect it here instead of computing it,
# because it is already computed during the retrieval attempt in the `#produce` flow.
def set_topic_config(topic, config, config_hash)
# Ensure lock on topic reference just in case
@native_kafka.with_inner do |inner|
@topics_refs_map[topic] ||= {}
@topics_configs[topic] ||= {}

return if @topics_configs[topic].key?(config_hash)

# If config is empty, we create an empty reference that will be used with defaults
rd_topic_config = if config.empty?
nil
else
Rdkafka::Bindings.rd_kafka_topic_conf_new.tap do |topic_config|
config.each do |key, value|
error_buffer = FFI::MemoryPointer.from_string(" " * 256)
result = Rdkafka::Bindings.rd_kafka_topic_conf_set(
topic_config,
key.to_s,
value.to_s,
error_buffer,
256
)

unless result == :config_ok
raise Config::ConfigError.new(error_buffer.read_string)
end
end
end
end

@topics_configs[topic][config_hash] = config
@topics_refs_map[topic][config_hash] = Bindings.rd_kafka_topic_new(inner, topic, rd_topic_config)
end
end

# Starts the native Kafka polling thread and kicks off the init polling
# @note Not needed to run unless explicit start was disabled
def start
Expand Down Expand Up @@ -83,7 +130,18 @@ def delivery_callback=(callback)
def close
return if closed?
ObjectSpace.undefine_finalizer(self)
@native_kafka.close

@native_kafka.close do
# We need to remove the topics references objects before we destroy the producer,
# otherwise they would leak out
@topics_refs_map.each_value do |refs|
refs.each_value do |ref|
Rdkafka::Bindings.rd_kafka_topic_destroy(ref)
end
end
end

@topics_refs_map.clear
end

# Whether this producer has closed
Expand Down Expand Up @@ -182,11 +240,22 @@ def partition_count(topic)
# @param timestamp [Time,Integer,nil] Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970.
# @param headers [Hash<String,String>] Optional message headers
# @param label [Object, nil] a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report
# @param topic_config [Hash] topic config for given message dispatch. Allows to send messages to topics with different configuration
#
# @return [DeliveryHandle] Delivery handle that can be used to wait for the result of producing this message
#
# @raise [RdkafkaError] When adding the message to rdkafka's queue failed
def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil, label: nil)
def produce(
topic:,
payload: nil,
key: nil,
partition: nil,
partition_key: nil,
timestamp: nil,
headers: nil,
label: nil,
topic_config: EMPTY_HASH
)
closed_producer_check(__method__)

# Start by checking and converting the input
Expand All @@ -205,8 +274,20 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil,
key.bytesize
end

topic_config_hash = topic_config.hash

# Checks if we have the rdkafka topic reference object ready. It saves us on object
# allocation and allows to use custom config on demand.
set_topic_config(topic, topic_config, topic_config_hash) unless @topics_refs_map.dig(topic, topic_config_hash)
topic_ref = @topics_refs_map.dig(topic, topic_config_hash)

if partition_key
partition_count = partition_count(topic)

# Check if there are no overrides for the partitioner and use the default one only when
# no per-topic is present.
partitioner_name = @topics_configs.dig(topic, topic_config_hash, :partitioner) || @partitioner_name

# If the topic is not present, set to -1
partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, @partitioner_name) if partition_count.positive?
end
Expand Down Expand Up @@ -236,7 +317,7 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil,
DeliveryHandle.register(delivery_handle)

args = [
:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic,
:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_RKT, :pointer, topic_ref,
:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY,
:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size,
:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size,
Expand Down
42 changes: 42 additions & 0 deletions spec/rdkafka/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,48 @@
it { expect(producer.name).to include('rdkafka#producer-') }
end

describe '#produce with topic config alterations' do
context 'when config is not valid' do
it 'expect to raise error' do
expect do
producer.produce(topic: 'test', payload: '', topic_config: { 'invalid': 'invalid' })
end.to raise_error(Rdkafka::Config::ConfigError)
end
end

context 'when config is valid' do
it 'expect to raise error' do
expect do
producer.produce(topic: 'test', payload: '', topic_config: { 'acks': 1 }).wait
end.not_to raise_error
end

context 'when alteration should change behavior' do
# This is set incorrectly for a reason
# If alteration would not work, this will hang the spec suite
let(:producer) do
rdkafka_producer_config(
'message.timeout.ms': 1_000_000,
:"bootstrap.servers" => "localhost:9094",
).producer
end

it 'expect to give up on delivery fast based on alteration config' do
expect do
producer.produce(
topic: 'produce_config_test',
payload: 'test',
topic_config: {
'compression.type': 'gzip',
'message.timeout.ms': 1
}
).wait
end.to raise_error(Rdkafka::RdkafkaError, /msg_timed_out/)
end
end
end
end

context "delivery callback" do
context "with a proc/lambda" do
it "should set the callback" do
Expand Down