diff --git a/index.d.ts b/index.d.ts index 5097cbc..270c5e6 100644 --- a/index.d.ts +++ b/index.d.ts @@ -103,6 +103,7 @@ export interface ConsumerConfig { regexSubscriptionMode?: RegexSubscriptionMode; deadLetterPolicy?: DeadLetterPolicy; batchReceivePolicy?: ConsumerBatchReceivePolicy; + keySharedPolicy?: KeySharedPolicy; } export class Consumer { @@ -194,6 +195,18 @@ export interface ConsumerBatchReceivePolicy { timeoutMs?: number; } +export interface ConsumerKeyShareStickyRange { + start: number; + end: number; +} +export type ConsumerKeyShareStickyRanges = ConsumerKeyShareStickyRange[]; + +export interface KeySharedPolicy { + keyShareMode?: ConsumerKeyShareMode; + allowOutOfOrderDelivery?: boolean; + stickyRanges?: ConsumerKeyShareStickyRanges; +} + export class AuthenticationTls { constructor(params: { certificatePath: string, privateKeyPath: string }); } @@ -296,6 +309,10 @@ export type ConsumerCryptoFailureAction = 'DISCARD' | 'CONSUME'; +export type ConsumerKeyShareMode = + 'AutoSplit' | + 'Sticky'; + export type RegexSubscriptionMode = 'PersistentOnly' | 'NonPersistentOnly' | diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc index 23e7976..7b2b61c 100644 --- a/src/ConsumerConfig.cc +++ b/src/ConsumerConfig.cc @@ -21,6 +21,7 @@ #include "Consumer.h" #include "SchemaInfo.h" #include "Message.h" +#include "pulsar/ConsumerConfiguration.h" #include #include #include @@ -55,6 +56,10 @@ static const std::string CFG_BATCH_RECEIVE_POLICY = "batchReceivePolicy"; static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES = "maxNumMessages"; static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES = "maxNumBytes"; static const std::string CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS = "timeoutMs"; +static const std::string CFG_KEY_SHARED_POLICY = "keySharedPolicy"; +static const std::string CFG_KEY_SHARED_POLICY_MODE = "keyShareMode"; +static const std::string CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER = "allowOutOfOrderDelivery"; +static const std::string CFG_KEY_SHARED_POLICY_STICKY_RANGES = "stickyRanges"; static const std::map SUBSCRIPTION_TYPE = { {"Exclusive", pulsar_ConsumerExclusive}, @@ -76,6 +81,15 @@ static const std::map CONSUM {"CONSUME", pulsar_ConsumerConsume}, }; +static const std::map CONSUMER_KEY_SHARED_POLICY_MODE = { + {"AutoSplit", pulsar::KeySharedMode::AUTO_SPLIT}, + {"Sticky", pulsar::KeySharedMode::STICKY}, +}; + +struct _pulsar_consumer_configuration { + pulsar::ConsumerConfiguration consumerConfiguration; +}; + void FinalizeListenerCallback(Napi::Env env, MessageListenerCallback *cb, void *) { delete cb; } ConsumerConfig::ConsumerConfig() @@ -324,6 +338,58 @@ void ConsumerConfig::InitConfig(std::shared_ptr deferred, return; } } + + if (consumerConfig.Has(CFG_KEY_SHARED_POLICY) && consumerConfig.Get(CFG_KEY_SHARED_POLICY).IsObject()) { + Napi::Object propObj = consumerConfig.Get(CFG_KEY_SHARED_POLICY).ToObject(); + pulsar::KeySharedPolicy cppKeySharedPolicy; + + if (propObj.Has(CFG_KEY_SHARED_POLICY_MODE) && propObj.Get(CFG_KEY_SHARED_POLICY_MODE).IsString()) { + std::string keyShareModeStr = propObj.Get(CFG_KEY_SHARED_POLICY_MODE).ToString().Utf8Value(); + if (CONSUMER_KEY_SHARED_POLICY_MODE.count(keyShareModeStr)) { + cppKeySharedPolicy.setKeySharedMode(CONSUMER_KEY_SHARED_POLICY_MODE.at(keyShareModeStr)); + } + } + + if (propObj.Has(CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER) && + propObj.Get(CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER).IsBoolean()) { + bool allowOutOfOrderDelivery = propObj.Get(CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER).ToBoolean(); + cppKeySharedPolicy.setAllowOutOfOrderDelivery(allowOutOfOrderDelivery); + } + + if (propObj.Has(CFG_KEY_SHARED_POLICY_STICKY_RANGES) && + propObj.Get(CFG_KEY_SHARED_POLICY_STICKY_RANGES).IsArray()) { + Napi::Array rangesArray = propObj.Get(CFG_KEY_SHARED_POLICY_STICKY_RANGES).As(); + pulsar::StickyRanges stickyRanges; + for (uint32_t i = 0; i < rangesArray.Length(); i++) { + if (rangesArray.Get(i).IsObject()) { + Napi::Object rangeObj = rangesArray.Get(i).ToObject(); + if (rangeObj.Has("start") && rangeObj.Has("end") && rangeObj.Get("start").IsNumber() && + rangeObj.Get("end").IsNumber()) { + int start = rangeObj.Get("start").ToNumber().Int32Value(); + int end = rangeObj.Get("end").ToNumber().Int32Value(); + if (start > end) { + std::string error = "Invalid sticky range at index " + std::to_string(i) + ": start (" + + std::to_string(start) + ") > end (" + std::to_string(end) + ")"; + deferred->Reject(error); + return; + } + stickyRanges.emplace_back(start, end); + } else { + std::string error = "Invalid sticky range format at index " + std::to_string(i) + + ": missing 'start'/'end' or invalid type, should be number type"; + deferred->Reject(error); + return; + } + } else { + std::string error = "Sticky range element at index " + std::to_string(i) + " is not an object"; + deferred->Reject(error); + return; + } + } + cppKeySharedPolicy.setStickyRanges(stickyRanges); + } + this->cConsumerConfig.get()->consumerConfiguration.setKeySharedPolicy(cppKeySharedPolicy); + } } ConsumerConfig::~ConsumerConfig() { diff --git a/tests/consumer.test.js b/tests/consumer.test.js index 3a7d203..426eba6 100644 --- a/tests/consumer.test.js +++ b/tests/consumer.test.js @@ -429,6 +429,97 @@ const Pulsar = require('../index'); await producer.close(); await consumer.close(); }); + test('testStickyConsumer', async () => { + const topicName = `KeySharedPolicyTest-sticky-consumer-${Date.now()}`; + const subName = 'SubscriptionName'; + const numMessages = 1000; + const numConsumers = 3; + + // Create producer with round-robin routing + const producer = await client.createProducer({ + topic: topicName, + batchingEnabled: false, + messageRoutingMode: 'RoundRobinDistribution', + }); + + // Create 3 consumers with different hash ranges + const consumers = []; + const stickyRanges = [ + { start: 0, end: 1000 }, // let consumer 1 handle small range + { start: 1001, end: 30000 }, + { start: 30001, end: 65535 }, + ]; + let i = 0; + while (i < numConsumers) { + const consumer = await client.subscribe({ + topic: topicName, + subscription: subName, + subscriptionType: 'KeyShared', + keySharedPolicy: { + keyShareMode: 'Sticky', + stickyRanges: [stickyRanges[i]], + }, + }); + consumers.push(consumer); + i += 1; + } + + // Send messages with random keys + const keys = Array.from({ length: 300 }, (_, index) => index.toString()); + let msgIndex = 0; + while (msgIndex < numMessages) { + const key = keys[Math.floor(Math.random() * keys.length)]; + await producer.send({ + data: Buffer.from(msgIndex.toString()), + partitionKey: key, + }); + msgIndex += 1; + } + + const assertKeyConsumerIndex = (keyToConsumer, key, expectedIndex) => { + const actualIndex = keyToConsumer.get(key); + expect(actualIndex).toBe(expectedIndex, `Key ${key} assigned to different consumer`); + }; + + // Verify message distribution + const messagesPerConsumer = Array(numConsumers).fill(0); + const keyToConsumer = new Map(); + let messagesReceived = 0; + // eslint-disable-next-line no-restricted-syntax + for (const [index, consumer] of consumers.entries()) { + let msg; + // eslint-disable-next-line no-constant-condition + while (true) { + try { + msg = await consumer.receive(2000); + } catch (err) { + if (err.message.includes('TimeOut')) { + break; + } else { + console.error('Receive error:', err); + } + } + const key = msg.getPartitionKey() || msg.getOrderingKey(); + messagesPerConsumer[index] += 1; + messagesReceived += 1; + if (keyToConsumer.has(key)) { + assertKeyConsumerIndex(keyToConsumer, key, index); + } else { + keyToConsumer.set(key, index); + } + await consumer.acknowledge(msg); + } + } + expect(messagesReceived).toBe(numMessages); + + // Verify even distribution across consumers + console.log('Messages per consumer:', messagesPerConsumer); + // Consumer 0 are expected to receive a message count < 100 + expect(messagesPerConsumer[0]).toBeLessThan(100); + // Consumer 1 and 2 are expected to receive a message count > 400 + expect(messagesPerConsumer[1]).toBeGreaterThan(400); + expect(messagesPerConsumer[2]).toBeGreaterThan(400); + }, 20000); }); }); })();