Skip to content

Commit 85193d8

Browse files
author
LAN Xingcan
committed
Implement assignment lost
1 parent ee7bf64 commit 85193d8

File tree

4 files changed

+44
-2
lines changed

4 files changed

+44
-2
lines changed

deps/librdkafka

lib/kafka-consumer.js

+11
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,17 @@ KafkaConsumer.prototype.position = function(toppars) {
392392
return this._errorWrap(this._client.position(toppars), true);
393393
};
394394

395+
/**
396+
* Check whether the consumer considers the current assignment to have been
397+
* lost invountarily.
398+
*
399+
* @throws Throws from native land if
400+
* @returns {boolean} Whether the assignment have been lost or not
401+
*/
402+
KafkaConsumer.prototype.assimentLost = function() {
403+
return this._client.assignmentLost();
404+
}
405+
395406
/**
396407
* Unsubscribe from all currently subscribed topics
397408
*

src/kafka-consumer.cc

+29
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,20 @@ Baton KafkaConsumer::Position(std::vector<RdKafka::TopicPartition*> &toppars) {
386386
return Baton(err);
387387
}
388388

389+
390+
Baton KafkaConsumer::AssigmentLost() {
391+
if (!IsConnected()) {
392+
return Baton(RdKafka::ERR__STATE, "KafkaConsumer is not connected");
393+
}
394+
395+
RdKafka::KafkaConsumer* consumer =
396+
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
397+
398+
// XXX: Returning a bool by casting it to a pointer,
399+
return Baton(reinterpret_cast<void*>(
400+
static_cast<uintptr_t>(consumer->assignment_lost() ? true : false)));
401+
}
402+
389403
Baton KafkaConsumer::Subscription() {
390404
if (!IsConnected()) {
391405
return Baton(RdKafka::ERR__STATE, "Consumer is not connected");
@@ -585,6 +599,7 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
585599

586600
Nan::SetPrototypeMethod(tpl, "committed", NodeCommitted);
587601
Nan::SetPrototypeMethod(tpl, "position", NodePosition);
602+
Nan::SetPrototypeMethod(tpl, "assignemntLost", NodeAssignmentLost);
588603
Nan::SetPrototypeMethod(tpl, "assign", NodeAssign);
589604
Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign);
590605
Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign);
@@ -745,6 +760,20 @@ NAN_METHOD(KafkaConsumer::NodePosition) {
745760
RdKafka::TopicPartition::destroy(toppars);
746761
}
747762

763+
NAN_METHOD(KafkaConsumer::NodeAssignmentLost) {
764+
765+
Nan::HandleScope scope;
766+
767+
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
768+
769+
Baton b = consumer->AssigmentLost();
770+
if (b.err() != RdKafka::ERR_NO_ERROR) {
771+
Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
772+
}
773+
bool result = static_cast<bool>(reinterpret_cast<uintptr_t>(b.data<void*>()));
774+
info.GetReturnValue().Set(Nan::New<v8::Boolean>(result));
775+
}
776+
748777
NAN_METHOD(KafkaConsumer::NodeAssignments) {
749778
Nan::HandleScope scope;
750779

src/kafka-consumer.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class KafkaConsumer : public Connection {
6565

6666
Baton Committed(std::vector<RdKafka::TopicPartition*> &, int timeout_ms);
6767
Baton Position(std::vector<RdKafka::TopicPartition*> &);
68+
Baton AssigmentLost();
6869

6970
Baton RefreshAssignments();
7071

@@ -101,7 +102,7 @@ class KafkaConsumer : public Connection {
101102
bool m_is_subscribed = false;
102103

103104
void* m_consume_loop;
104-
105+
105106
// Node methods
106107
static NAN_METHOD(NodeConnect);
107108
static NAN_METHOD(NodeSubscribe);
@@ -117,6 +118,7 @@ class KafkaConsumer : public Connection {
117118
static NAN_METHOD(NodeOffsetsStore);
118119
static NAN_METHOD(NodeCommitted);
119120
static NAN_METHOD(NodePosition);
121+
static NAN_METHOD(NodeAssignmentLost);
120122
static NAN_METHOD(NodeSubscription);
121123
static NAN_METHOD(NodeSeek);
122124
static NAN_METHOD(NodeGetWatermarkOffsets);

0 commit comments

Comments
 (0)