Skip to content

Support consumer key-shared policy #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export interface ConsumerConfig {
regexSubscriptionMode?: RegexSubscriptionMode;
deadLetterPolicy?: DeadLetterPolicy;
batchReceivePolicy?: ConsumerBatchReceivePolicy;
keySharedPolicy?: KeySharedPolicy;
}

export class Consumer {
Expand Down Expand Up @@ -194,6 +195,18 @@ export interface ConsumerBatchReceivePolicy {
timeoutMs?: number;
}

export interface ConsumerKeyShareStickyRange {
start: number;
end: number;
}
export type ConsumerKeyShareStickyRanges = ConsumerKeyShareStickyRange[];

export interface KeySharedPolicy {
keyShareMode?: ConsumerKeyShareMode;
allowOutOfOrderDelivery?: boolean;
stickyRanges?: ConsumerKeyShareStickyRanges;
}

export class AuthenticationTls {
constructor(params: { certificatePath: string, privateKeyPath: string });
}
Expand Down Expand Up @@ -296,6 +309,10 @@ export type ConsumerCryptoFailureAction =
'DISCARD' |
'CONSUME';

export type ConsumerKeyShareMode =
'AutoSplit' |
'Sticky';

export type RegexSubscriptionMode =
'PersistentOnly' |
'NonPersistentOnly' |
Expand Down
66 changes: 66 additions & 0 deletions src/ConsumerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "Consumer.h"
#include "SchemaInfo.h"
#include "Message.h"
#include "pulsar/ConsumerConfiguration.h"
#include <pulsar/c/consumer_configuration.h>
#include <pulsar/c/consumer.h>
#include <map>
Expand Down Expand Up @@ -55,6 +56,10 @@ static const std::string CFG_BATCH_RECEIVE_POLICY = "batchReceivePolicy";
static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES = "maxNumMessages";
static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES = "maxNumBytes";
static const std::string CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS = "timeoutMs";
static const std::string CFG_KEY_SHARED_POLICY = "keySharedPolicy";
static const std::string CFG_KEY_SHARED_POLICY_MODE = "keyShareMode";
static const std::string CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER = "allowOutOfOrderDelivery";
static const std::string CFG_KEY_SHARED_POLICY_STICKY_RANGES = "stickyRanges";

static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Exclusive", pulsar_ConsumerExclusive},
Expand All @@ -76,6 +81,15 @@ static const std::map<std::string, pulsar_consumer_crypto_failure_action> CONSUM
{"CONSUME", pulsar_ConsumerConsume},
};

static const std::map<std::string, pulsar::KeySharedMode> CONSUMER_KEY_SHARED_POLICY_MODE = {
{"AutoSplit", pulsar::KeySharedMode::AUTO_SPLIT},
{"Sticky", pulsar::KeySharedMode::STICKY},
};

struct _pulsar_consumer_configuration {
pulsar::ConsumerConfiguration consumerConfiguration;
};

void FinalizeListenerCallback(Napi::Env env, MessageListenerCallback *cb, void *) { delete cb; }

ConsumerConfig::ConsumerConfig()
Expand Down Expand Up @@ -324,6 +338,58 @@ void ConsumerConfig::InitConfig(std::shared_ptr<ThreadSafeDeferred> deferred,
return;
}
}

if (consumerConfig.Has(CFG_KEY_SHARED_POLICY) && consumerConfig.Get(CFG_KEY_SHARED_POLICY).IsObject()) {
Napi::Object propObj = consumerConfig.Get(CFG_KEY_SHARED_POLICY).ToObject();
pulsar::KeySharedPolicy cppKeySharedPolicy;

if (propObj.Has(CFG_KEY_SHARED_POLICY_MODE) && propObj.Get(CFG_KEY_SHARED_POLICY_MODE).IsString()) {
std::string keyShareModeStr = propObj.Get(CFG_KEY_SHARED_POLICY_MODE).ToString().Utf8Value();
if (CONSUMER_KEY_SHARED_POLICY_MODE.count(keyShareModeStr)) {
cppKeySharedPolicy.setKeySharedMode(CONSUMER_KEY_SHARED_POLICY_MODE.at(keyShareModeStr));
}
}

if (propObj.Has(CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER) &&
propObj.Get(CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER).IsBoolean()) {
bool allowOutOfOrderDelivery = propObj.Get(CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER).ToBoolean();
cppKeySharedPolicy.setAllowOutOfOrderDelivery(allowOutOfOrderDelivery);
}

if (propObj.Has(CFG_KEY_SHARED_POLICY_STICKY_RANGES) &&
propObj.Get(CFG_KEY_SHARED_POLICY_STICKY_RANGES).IsArray()) {
Napi::Array rangesArray = propObj.Get(CFG_KEY_SHARED_POLICY_STICKY_RANGES).As<Napi::Array>();
pulsar::StickyRanges stickyRanges;
for (uint32_t i = 0; i < rangesArray.Length(); i++) {
if (rangesArray.Get(i).IsObject()) {
Napi::Object rangeObj = rangesArray.Get(i).ToObject();
if (rangeObj.Has("start") && rangeObj.Has("end") && rangeObj.Get("start").IsNumber() &&
rangeObj.Get("end").IsNumber()) {
int start = rangeObj.Get("start").ToNumber().Int32Value();
int end = rangeObj.Get("end").ToNumber().Int32Value();
if (start > end) {
std::string error = "Invalid sticky range at index " + std::to_string(i) + ": start (" +
std::to_string(start) + ") > end (" + std::to_string(end) + ")";
deferred->Reject(error);
return;
}
stickyRanges.emplace_back(start, end);
} else {
std::string error = "Invalid sticky range format at index " + std::to_string(i) +
": missing 'start'/'end' or invalid type, should be number type";
deferred->Reject(error);
return;
}
} else {
std::string error = "Sticky range element at index " + std::to_string(i) + " is not an object";
deferred->Reject(error);
return;
}
}
cppKeySharedPolicy.setStickyRanges(stickyRanges);
}
this->cConsumerConfig.get()->consumerConfiguration.setKeySharedPolicy(cppKeySharedPolicy);
}
}

ConsumerConfig::~ConsumerConfig() {
Expand Down
91 changes: 91 additions & 0 deletions tests/consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,97 @@ const Pulsar = require('../index');
await producer.close();
await consumer.close();
});
test('testStickyConsumer', async () => {
const topicName = `KeySharedPolicyTest-sticky-consumer-${Date.now()}`;
const subName = 'SubscriptionName';
const numMessages = 1000;
const numConsumers = 3;

// Create producer with round-robin routing
const producer = await client.createProducer({
topic: topicName,
batchingEnabled: false,
messageRoutingMode: 'RoundRobinDistribution',
});

// Create 3 consumers with different hash ranges
const consumers = [];
const stickyRanges = [
{ start: 0, end: 1000 }, // let consumer 1 handle small range
{ start: 1001, end: 30000 },
{ start: 30001, end: 65535 },
];
let i = 0;
while (i < numConsumers) {
const consumer = await client.subscribe({
topic: topicName,
subscription: subName,
subscriptionType: 'KeyShared',
keySharedPolicy: {
keyShareMode: 'Sticky',
stickyRanges: [stickyRanges[i]],
},
});
consumers.push(consumer);
i += 1;
}

// Send messages with random keys
const keys = Array.from({ length: 300 }, (_, index) => index.toString());
let msgIndex = 0;
while (msgIndex < numMessages) {
const key = keys[Math.floor(Math.random() * keys.length)];
await producer.send({
data: Buffer.from(msgIndex.toString()),
partitionKey: key,
});
msgIndex += 1;
}

const assertKeyConsumerIndex = (keyToConsumer, key, expectedIndex) => {
const actualIndex = keyToConsumer.get(key);
expect(actualIndex).toBe(expectedIndex, `Key ${key} assigned to different consumer`);
};

// Verify message distribution
const messagesPerConsumer = Array(numConsumers).fill(0);
const keyToConsumer = new Map();
let messagesReceived = 0;
// eslint-disable-next-line no-restricted-syntax
for (const [index, consumer] of consumers.entries()) {
let msg;
// eslint-disable-next-line no-constant-condition
while (true) {
try {
msg = await consumer.receive(2000);
} catch (err) {
if (err.message.includes('TimeOut')) {
break;
} else {
console.error('Receive error:', err);
}
}
const key = msg.getPartitionKey() || msg.getOrderingKey();
messagesPerConsumer[index] += 1;
messagesReceived += 1;
if (keyToConsumer.has(key)) {
assertKeyConsumerIndex(keyToConsumer, key, index);
} else {
keyToConsumer.set(key, index);
}
await consumer.acknowledge(msg);
}
}
expect(messagesReceived).toBe(numMessages);

// Verify even distribution across consumers
console.log('Messages per consumer:', messagesPerConsumer);
// Consumer 0 are expected to receive a message count < 100
expect(messagesPerConsumer[0]).toBeLessThan(100);
// Consumer 1 and 2 are expected to receive a message count > 400
expect(messagesPerConsumer[1]).toBeGreaterThan(400);
expect(messagesPerConsumer[2]).toBeGreaterThan(400);
}, 20000);
});
});
})();
Loading