From d1ea28b7fc3402f1f924ab4d9083543fa8b51691 Mon Sep 17 00:00:00 2001 From: Martijn Imhoff Date: Wed, 6 Nov 2024 17:29:03 +0100 Subject: [PATCH] feat: consume per partition --- INTRODUCTION.md | 3 +- .../node-rdkafka/consumer-per-partition.md | 130 +++++++++++++ lib/kafka-consumer.js | 81 +++++++- src/kafka-consumer.cc | 149 +++++++++++--- src/kafka-consumer.h | 3 + src/workers.cc | 183 ++++++++++++++++++ src/workers.h | 19 ++ types/rdkafka.d.ts | 8 +- 8 files changed, 543 insertions(+), 33 deletions(-) create mode 100644 examples/node-rdkafka/consumer-per-partition.md diff --git a/INTRODUCTION.md b/INTRODUCTION.md index ecbb3c2a..1c2f1356 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -358,7 +358,7 @@ stream.consumer.commit(); // Commits all locally stored offsets You can also use the Standard API and manage callbacks and events yourself. You can choose different modes for consuming messages: * *Flowing mode*. This mode flows all of the messages it can read by maintaining an infinite loop in the event loop. It only stops when it detects the consumer has issued the `unsubscribe` or `disconnect` method. -* *Non-flowing mode*. This mode reads a single message from Kafka at a time manually. +* *Non-flowing mode*. This mode reads a single message from Kafka at a time manually. You may choose to read from a specific partition or all at once. [An example has been included on how to consume from a specific partition](./examples/node-rdkafka/consumer-per-partition.md). The following example illustrates flowing mode: ```js @@ -411,6 +411,7 @@ The following table lists important methods for this API. | `consumer.unsubscribe()` | Unsubscribes from the currently subscribed topics.

You cannot subscribe to different topics without calling the `unsubscribe()` method first. | | `consumer.consume(cb)` | Gets messages from the existing subscription as quickly as possible. If `cb` is specified, invokes `cb(err, message)`.

This method keeps a background thread running to do the work. Note that the number of threads in nodejs process is limited by `UV_THREADPOOL_SIZE` (default value is 4) and using up all of them blocks other parts of the application that need threads. If you need multiple consumers then consider increasing `UV_THREADPOOL_SIZE` or using `consumer.consume(number, cb)` instead. | | `consumer.consume(number, cb)` | Gets `number` of messages from the existing subscription. If `cb` is specified, invokes `cb(err, message)`. | +| `consumer.consume(number, topic, partition, cb)` | Gets `number` of messages from a partition of the given topic. The topic must have a subscription. If `cb` is specified, invokes `cb(err, message)`. | | `consumer.commit()` | Commits all locally stored offsets | | `consumer.commit(topicPartition)` | Commits offsets specified by the topic partition | | `consumer.commitMessage(message)` | Commits the offsets specified by the message | diff --git a/examples/node-rdkafka/consumer-per-partition.md b/examples/node-rdkafka/consumer-per-partition.md new file mode 100644 index 00000000..6719f7c1 --- /dev/null +++ b/examples/node-rdkafka/consumer-per-partition.md @@ -0,0 +1,130 @@ +A consumer that is subscribed to multiple partitions can control the mix of messages consumed from each partition. How this is done is explained [here](https://github.com/confluentinc/librdkafka/wiki/FAQ#what-are-partition-queues-and-why-are-some-partitions-slower-than-others). + +The example below simulates a partition 0 which is slow (2s per consume). Other partitions consume at a rate of 0.5s. To use the example, create a topic "test" with two partitions. Produce 500 message to both partitions. This example does not require an active producer. Run the example to see the result. Run multiple instances to see the rebalancing take effect. + +```js +/* + * node-rdkafka - Node.js wrapper for RdKafka C/C++ library + * + * Copyright (c) 2016 Blizzard Entertainment + * + * This software may be modified and distributed under the terms + * of the MIT license. See the LICENSE.txt file for details. + */ + +var Kafka = require('../'); + +var consumer = new Kafka.KafkaConsumer({ + //'debug': 'all', + 'metadata.broker.list': 'localhost:9092', + 'group.id': 'test-group-' + Math.random(), + 'enable.auto.commit': false, + 'rebalance_cb': true, +}, { + 'auto.offset.reset': 'earliest', // start from the beginning +}); + +var topicName = 'test'; + +// Keep track of which partitions are assigned. +var assignments = []; + +//logging debug messages, if debug is enabled +consumer.on('event.log', function(log) { + console.log(log); +}); + +//logging all errors +consumer.on('event.error', function(err) { + console.error('Error from consumer'); + console.error(err); +}); + +consumer.on('ready', function(arg) { + console.log('consumer ready: ' + JSON.stringify(arg)); + + consumer.subscribe([topicName]); + + // start a regular consume loop in flowing mode. This won't result in any + // messages because will we start consuming from a partition directly. + // This is required to serve the rebalancing events + consumer.consume(); +}); + +// Start our own consume loops for all newly assigned partitions +consumer.on('rebalance', function(err, updatedAssignments) { + console.log('rebalancing done, got partitions assigned: ', updatedAssignments.map(function(a) { + return a.partition; + })); + + // Normally messages are forwarded to a general queue, which contains messages from all assigned partitions. + // however we want to consume per partitions, for this we need to disable forwarding. + updatedAssignments.forEach(function (assignment) { + consumer.disableQueueForwarding(assignment); + }); + + // find new assignments + var newAssignments = updatedAssignments.filter(function (updatedAssignment) { + return !assignments.some(function (assignment) { + return assignment.partition === updatedAssignment.partition; + }); + }); + + // update global assignments array + assignments = updatedAssignments; + + // then start consume loops for the new assignments + newAssignments.forEach(function (assignment) { + startConsumeMessages(assignment.partition); + }); +}); + +function startConsumeMessages(partition) { + console.log('partition: ' + partition + ' starting to consume'); + + function consume() { + var isPartitionAssigned = assignments.some(function(assignment) { + return assignment.partition === partition; + }); + + if (!isPartitionAssigned) { + console.log('partition: ' + partition + ' stop consuming'); + return; + } + + // consume per 5 messages + consumer.consume(5, topicName, partition, callback); + } + + function callback(err, messages) { + messages.forEach(function(message) { + // consume the message + console.log('partition ' + message.partition + ' value ' + message.value.toString()); + consumer.commitMessage(message); + }); + + if (messages.length > 0) { + consumer.commitMessage(messages.pop()); + } + + // simulate performance + setTimeout(consume, partition === 0 ? 2000 : 500); + } + + // kick-off recursive consume loop + consume(); +} + +consumer.on('disconnected', function(arg) { + console.log('consumer disconnected. ' + JSON.stringify(arg)); +}); + +//starting the consumer +consumer.connect(); + +//stopping this example after 30s +setTimeout(function() { + consumer.disconnect(); +}, 30000); + +``` \ No newline at end of file diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index 75d84790..201b3a1d 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -155,6 +155,11 @@ KafkaConsumer.prototype.setDefaultConsumeLoopTimeoutDelay = function(intervalMs) this._consumeLoopTimeoutDelay = intervalMs; }; +KafkaConsumer.prototype.disableQueueForwarding = function(topicPartition) { + this._client.disableQueueForwarding(topicPartition); + return this; +}; + /** * If true: * In consume(number, cb), we will wait for `timeoutMs` for the first message to be fetched. @@ -430,6 +435,20 @@ KafkaConsumer.prototype.unsubscribe = function() { }; /** + * Read a number of messages from a specific topic and partition. + * + * Can be useful if the consume performance differs per partition. Consuming + * per partition could prevent slow performance on one partition from affecting + * the consumption of other partitions. + * + * To select the right partition it is required to set a topic param, because a + * consumer can be subscribed to multiple topics. + * + * @param {number} size - Number of messages to read + * @param {string} topic - Name of topic to read + * @param {number} partition - Identifier of partition to read + * @param {KafkaConsumer~readCallback} cb - Callback to return when work is done. + *//** * Read a number of messages from Kafka. * * This method is similar to the main one, except that it reads a number @@ -453,11 +472,22 @@ KafkaConsumer.prototype.unsubscribe = function() { * @param {KafkaConsumer~readCallback} cb - Callback to return when a message * is fetched. */ -KafkaConsumer.prototype.consume = function(number, cb) { + KafkaConsumer.prototype.consume = function(number, topic, partition, cb) { var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT; + var self = this; + + if ((number && typeof number === 'number') && typeof topic === 'string' && typeof partition === 'number') { - if ((number && typeof number === 'number') || (number && cb)) { + if (cb === undefined) { + cb = function() {}; + } else if (typeof cb !== 'function') { + throw new TypeError('Callback must be a function'); + } + this._consumeNumOfPartition(timeoutMs, number, topic, partition, cb); + } else if ((number && typeof number === 'number') || (number && topic)) { + // topic is given as the cb + cb = topic; if (cb === undefined) { cb = function() {}; } else if (typeof cb !== 'function') { @@ -567,6 +597,53 @@ KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) { }; +/** + * Consume a number of messages from a specific topic and partition + * Wrapped in a try catch with proper error reporting. Should not be + * called directly, and instead should be called using consume. + * + * @private + * @see consume + */ +KafkaConsumer.prototype._consumeNumOfPartition = function(timeoutMs, numMessages, topic, partition, cb) { + var self = this; + + this._client.consume(timeoutMs, numMessages, topic, partition, function(err, messages, eofEvents) { + if (err) { + err = LibrdKafkaError.create(err); + if (cb) { + cb(err); + } + return; + } + + var currentEofEventsIndex = 0; + + function emitEofEventsFor(messageIndex) { + while (currentEofEventsIndex < eofEvents.length && eofEvents[currentEofEventsIndex].messageIndex === messageIndex) { + delete eofEvents[currentEofEventsIndex].messageIndex; + self.emit('partition.eof', eofEvents[currentEofEventsIndex]) + ++currentEofEventsIndex; + } + } + + emitEofEventsFor(-1); + + for (var i = 0; i < messages.length; i++) { + self.emit('data', messages[i]); + emitEofEventsFor(i); + } + + emitEofEventsFor(messages.length); + + if (cb) { + cb(null, messages); + } + + }, this._consumeIsTimeoutOnlyForFirstMessage); + +}; + /** * This callback returns the message read from Kafka. * diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 0ea74420..3967ddd4 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -522,6 +522,30 @@ std::string KafkaConsumer::RebalanceProtocol() { return m_consumer->rebalance_protocol(); } +Baton KafkaConsumer::DisableQueueForwarding(RdKafka::TopicPartition * toppar) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE, "KafkaConsumer is not connected"); + } + + // Disable forwarding for own partition + RdKafka::Queue *queue = m_client->get_partition_queue(toppar); + + if (queue == NULL) { + return Baton(RdKafka::ERR__STATE, + "TopicPartition has an invalid queue."); + } + + RdKafka::ErrorCode err = queue->forward(NULL); + if (err != RdKafka::ERR_NO_ERROR) { + delete queue; + return Baton(RdKafka::ERR__STATE, + "Could not disable queue for given partition."); + } + + delete queue; + return Baton(err); +} + Nan::Persistent KafkaConsumer::constructor; void KafkaConsumer::Init(v8::Local exports) { @@ -583,6 +607,7 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments); Nan::SetPrototypeMethod(tpl, "assignmentLost", NodeAssignmentLost); Nan::SetPrototypeMethod(tpl, "rebalanceProtocol", NodeRebalanceProtocol); + Nan::SetPrototypeMethod(tpl, "disableQueueForwarding", NodeDisableQueueForwarding); // NOLINT Nan::SetPrototypeMethod(tpl, "commit", NodeCommit); Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync); @@ -774,6 +799,36 @@ NAN_METHOD(KafkaConsumer::NodeRebalanceProtocol) { info.GetReturnValue().Set(Nan::New(protocol).ToLocalChecked()); } +NAN_METHOD(KafkaConsumer::NodeDisableQueueForwarding) { + Nan::HandleScope scope; + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + if (!consumer->IsConnected()) { + Nan::ThrowError("KafkaConsumer is disconnected"); + return; + } + + if (info[0]->IsObject()) { + RdKafka::TopicPartition * toppar = + Conversion::TopicPartition::FromV8Object(info[0].As()); + + if (toppar == NULL) { + Nan::ThrowError("Invalid topic partition provided"); + return; + } + + Baton b = consumer->DisableQueueForwarding(toppar); + + delete toppar; + } else { + Nan::ThrowError("First parameter must be an object"); + return; + } + + info.GetReturnValue().Set(Nan::Null()); +} + NAN_METHOD(KafkaConsumer::NodeAssign) { Nan::HandleScope scope; @@ -1388,42 +1443,82 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { } if (info[1]->IsNumber()) { - if (!info[2]->IsBoolean()) { - return Nan::ThrowError("Need to specify a boolean"); - } + if (info[2]->IsString() && info[3]->IsNumber()) { + // Consume per partition + if (!info[4]->IsFunction()) { + return Nan::ThrowError("Need to specify a callback"); + } - if (!info[3]->IsFunction()) { - return Nan::ThrowError("Need to specify a callback"); - } + v8::Local numMessagesNumber = info[1].As(); + Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT - v8::Local numMessagesNumber = info[1].As(); - Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT + uint32_t numMessages; + if (numMessagesMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a number over 0"); + } else { + numMessages = numMessagesMaybe.FromJust(); + } - uint32_t numMessages; - if (numMessagesMaybe.IsNothing()) { - return Nan::ThrowError("Parameter must be a number over 0"); - } else { - numMessages = numMessagesMaybe.FromJust(); - } + // Get string pointer for the topic name + Nan::Utf8String topicUTF8(Nan::To(info[2]).ToLocalChecked()); + std::string topic_name(*topicUTF8); - v8::Local isTimeoutOnlyForFirstMessageBoolean = info[2].As(); // NOLINT - Nan::Maybe isTimeoutOnlyForFirstMessageMaybe = - Nan::To(isTimeoutOnlyForFirstMessageBoolean); + // Parse partition + v8::Local partitionNumber = info[3].As(); + Nan::Maybe partitionMaybe = Nan::To(partitionNumber); // NOLINT - bool isTimeoutOnlyForFirstMessage; - if (isTimeoutOnlyForFirstMessageMaybe.IsNothing()) { - return Nan::ThrowError("Parameter must be a boolean"); + uint32_t partition; + if (partitionMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a number equal to or over 0"); + } else { + partition = partitionMaybe.FromJust(); + } + + // Parse onlyApplyTimeoutToFirstMessage + bool isTimeoutOnlyForFirstMessage; + if (!Nan::To(info[5]).To(&isTimeoutOnlyForFirstMessage)) { + isTimeoutOnlyForFirstMessage = false; + } + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + v8::Local cb = info[4].As(); + Nan::Callback *callback = new Nan::Callback(cb); + Nan::AsyncQueueWorker( + new Workers::KafkaConsumerConsumeNumOfPartition(callback, consumer, numMessages, topic_name, partition, timeout_ms, isTimeoutOnlyForFirstMessage)); // NOLINT } else { - isTimeoutOnlyForFirstMessage = isTimeoutOnlyForFirstMessageMaybe.FromJust(); // NOLINT - } + if (!info[2]->IsFunction()) { + return Nan::ThrowError("Need to specify a callback"); + } - KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + v8::Local numMessagesNumber = info[1].As(); + Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT - v8::Local cb = info[3].As(); - Nan::Callback *callback = new Nan::Callback(cb); - Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, isTimeoutOnlyForFirstMessage)); // NOLINT + uint32_t numMessages; + if (numMessagesMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a number over 0"); + } else { + numMessages = numMessagesMaybe.FromJust(); + } + + v8::Local isTimeoutOnlyForFirstMessageBoolean = info[2].As(); // NOLINT + Nan::Maybe isTimeoutOnlyForFirstMessageMaybe = + Nan::To(isTimeoutOnlyForFirstMessageBoolean); + + bool isTimeoutOnlyForFirstMessage; + if (isTimeoutOnlyForFirstMessageMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a boolean"); + } else { + isTimeoutOnlyForFirstMessage = isTimeoutOnlyForFirstMessageMaybe.FromJust(); // NOLINT + } + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + v8::Local cb = info[3].As(); + Nan::Callback *callback = new Nan::Callback(cb); + Nan::AsyncQueueWorker( + new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, isTimeoutOnlyForFirstMessage)); // NOLINT + } } else { if (!info[1]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h index e0d93562..e02527de 100644 --- a/src/kafka-consumer.h +++ b/src/kafka-consumer.h @@ -81,6 +81,8 @@ class KafkaConsumer : public Connection { std::string RebalanceProtocol(); + Baton DisableQueueForwarding(RdKafka::TopicPartition*); + Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms); Baton Subscribe(std::vector); @@ -124,6 +126,7 @@ class KafkaConsumer : public Connection { static NAN_METHOD(NodeAssignments); static NAN_METHOD(NodeAssignmentLost); static NAN_METHOD(NodeRebalanceProtocol); + static NAN_METHOD(NodeDisableQueueForwarding); static NAN_METHOD(NodeUnsubscribe); static NAN_METHOD(NodeCommit); static NAN_METHOD(NodeCommitSync); diff --git a/src/workers.cc b/src/workers.cc index 3df8ece6..0ebd62ff 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -794,6 +794,189 @@ void KafkaConsumerConsumeLoop::HandleErrorCallback() { callback->Call(argc, argv); } + +/** + * @brief KafkaConsumer get messages per partition worker. + * + * This callback will get a number of messages from a specific partition. + * Can be of use in streams or places where you don't want an infinite + * loop managed in C++land and would rather manage it in Node. + * + * @see RdKafka::KafkaConsumer::Consume + * @see NodeKafka::KafkaConsumer::GetMessage + */ + +KafkaConsumerConsumeNumOfPartition::KafkaConsumerConsumeNumOfPartition(Nan::Callback *callback, + KafkaConsumer* consumer, + const uint32_t & num_messages, + const std::string topic, + const uint32_t & partition, + const int & timeout_ms, + const bool only_apply_timeout_to_first_message) : + ErrorAwareWorker(callback), + m_consumer(consumer), + m_num_messages(num_messages), + m_topic(topic), + m_partition(partition), + m_timeout_ms(timeout_ms), + m_only_apply_timeout_to_first_message(only_apply_timeout_to_first_message) {} + +KafkaConsumerConsumeNumOfPartition::~KafkaConsumerConsumeNumOfPartition() {} + +void KafkaConsumerConsumeNumOfPartition::Execute() { + std::size_t max = static_cast(m_num_messages); + bool looping = true; + int timeout_ms = m_timeout_ms; + std::size_t eof_event_count = 0; + + // Disable forwarding for own partition + RdKafka::TopicPartition *topicPartition = RdKafka::TopicPartition::create( + m_topic, m_partition); + + if (topicPartition == NULL) { + SetErrorBaton(Baton(RdKafka::ERR__STATE, + "TopicPartition not found.")); + + return; + } + + RdKafka::Queue *queue = m_consumer->GetClient()->get_partition_queue( + topicPartition); + + if (queue == NULL) { + SetErrorBaton(Baton(RdKafka::ERR__STATE, + "TopicPartition has an invalid queue.")); + delete topicPartition; + return; + } + + while (m_messages.size() - eof_event_count < max && looping) { + if (!m_consumer->IsConnected()) { + if (m_messages.size() == eof_event_count) { + SetErrorBaton(Baton(RdKafka::ERR__STATE, + "KafkaConsumer is not connected")); + } + looping = false; + continue; + } + + // Get a message + RdKafka::Message *message = queue->consume(timeout_ms); + RdKafka::ErrorCode errorCode = message->err(); + + // If true, do not wait after the first message. This will cause to consume + // only what has also been fetched and then return immediately + if (m_only_apply_timeout_to_first_message) { + timeout_ms = 1; + } + + switch (errorCode) { + case RdKafka::ERR__PARTITION_EOF: + // If partition EOF and have consumed messages, retry with timeout 1 + // This allows getting ready messages, while not waiting for new ones + if (m_messages.size() > eof_event_count) { + timeout_ms = 1; + } + + // We will only go into this code path when `enable.partition.eof` is + // set to true. In this case, consumer is also interested in EOF + // messages, so we return an EOF message + m_messages.push_back(message); + eof_event_count += 1; + break; + case RdKafka::ERR__TIMED_OUT: + case RdKafka::ERR__TIMED_OUT_QUEUE: + // Break of the loop if we timed out + delete message; + looping = false; + break; + case RdKafka::ERR_NO_ERROR: + m_messages.push_back(message); + break; + default: + // Set the error for any other errors and break + delete message; + if (m_messages.size() == eof_event_count) { + SetErrorBaton(Baton(errorCode)); + } + looping = false; + break; + } + } + + delete queue; + delete topicPartition; +} + +void KafkaConsumerConsumeNumOfPartition::HandleOKCallback() { + Nan::HandleScope scope; + const unsigned int argc = 3; + v8::Local argv[argc]; + argv[0] = Nan::Null(); + + v8::Local returnArray = Nan::New(); + v8::Local eofEventsArray = Nan::New(); + + if (m_messages.size() > 0) { + int returnArrayIndex = -1; + int eofEventsArrayIndex = -1; + for (std::vector::iterator it = m_messages.begin(); + it != m_messages.end(); ++it) { + RdKafka::Message* message = *it; + + switch (message->err()) { + case RdKafka::ERR_NO_ERROR: + ++returnArrayIndex; + Nan::Set(returnArray, returnArrayIndex, Conversion::Message::ToV8Object(message)); + break; + case RdKafka::ERR__PARTITION_EOF: + ++eofEventsArrayIndex; + + // create EOF event + v8::Local eofEvent = Nan::New(); + + Nan::Set(eofEvent, Nan::New("topic").ToLocalChecked(), + Nan::New(message->topic_name()).ToLocalChecked()); + Nan::Set(eofEvent, Nan::New("offset").ToLocalChecked(), + Nan::New(message->offset())); + Nan::Set(eofEvent, Nan::New("partition").ToLocalChecked(), + Nan::New(message->partition())); + + // also store index at which position in the message array this event was emitted + // this way, we can later emit it at the right point in time + Nan::Set(eofEvent, Nan::New("messageIndex").ToLocalChecked(), + Nan::New(returnArrayIndex)); + + Nan::Set(eofEventsArray, eofEventsArrayIndex, eofEvent); + } + + delete message; + } + } + + argv[1] = returnArray; + argv[2] = eofEventsArray; + + callback->Call(argc, argv); +} + +void KafkaConsumerConsumeNumOfPartition::HandleErrorCallback() { + Nan::HandleScope scope; + + if (m_messages.size() > 0) { + for (std::vector::iterator it = m_messages.begin(); + it != m_messages.end(); ++it) { + RdKafka::Message* message = *it; + delete message; + } + } + + const unsigned int argc = 1; + v8::Local argv[argc] = { GetErrorObject() }; + + callback->Call(argc, argv); +} + /** * @brief KafkaConsumer get messages worker. * diff --git a/src/workers.h b/src/workers.h index 94cac0ce..0732c065 100644 --- a/src/workers.h +++ b/src/workers.h @@ -454,6 +454,25 @@ class KafkaConsumerSeek : public ErrorAwareWorker { const int m_timeout_ms; }; +class KafkaConsumerConsumeNumOfPartition : public ErrorAwareWorker { + public: + KafkaConsumerConsumeNumOfPartition(Nan::Callback*, NodeKafka::KafkaConsumer*, + const uint32_t &, const std::string, const uint32_t &, const int &, const bool); + ~KafkaConsumerConsumeNumOfPartition(); + + void Execute(); + void HandleOKCallback(); + void HandleErrorCallback(); + private: + NodeKafka::KafkaConsumer * m_consumer; + const uint32_t m_num_messages; + const std::string m_topic; + const uint32_t m_partition; + const int m_timeout_ms; + std::vector m_messages; + const bool m_only_apply_timeout_to_first_message; +}; + class KafkaConsumerConsumeNum : public ErrorAwareWorker { public: KafkaConsumerConsumeNum(Nan::Callback*, NodeKafka::KafkaConsumer*, diff --git a/types/rdkafka.d.ts b/types/rdkafka.d.ts index 626fc8e9..14b18d04 100644 --- a/types/rdkafka.d.ts +++ b/types/rdkafka.d.ts @@ -228,9 +228,9 @@ export class KafkaConsumer extends Client { committed(toppars: TopicPartition[], timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffsetAndMetadata[]) => void): this; committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffsetAndMetadata[]) => void): this; - consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void; - consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void; - consume(): void; + consume(number: number, topic: string, partition: number, cb?: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void; + consume(number: number, cb?: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void; + consume(cb: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void; getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets; @@ -244,6 +244,8 @@ export class KafkaConsumer extends Client { seek(toppar: TopicPartitionOffset, timeout: number | null, cb: (err: LibrdKafkaError) => void): this; + disableQueueForwarding(topicPartition: TopicPartition): this; + setDefaultConsumeTimeout(timeoutMs: number): void; setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void;