Skip to content

Commit 17e2c18

Browse files
thynsonLAN Xingcan
authored andcommitted
Add incremental assign and unassign support
1 parent 76ec934 commit 17e2c18

File tree

4 files changed

+204
-0
lines changed

4 files changed

+204
-0
lines changed

index.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
223223
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
224224
consume(): void;
225225

226+
incrementalAssign(assigments: Assignment[]): this;
227+
228+
incrementalUnassign(assignments: Assignment[]): this;
229+
226230
getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
227231

228232
offsetsStore(topicPartitions: TopicPartitionOffset[]): any;

lib/kafka-consumer.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,19 @@ KafkaConsumer.prototype.assign = function(assignments) {
264264
return this;
265265
};
266266

267+
/**
268+
* Incremental assign the consumer specific partitions and topics
269+
*
270+
* @param {array} assignments - Assignments array. Should contain
271+
* objects with topic and partition set.
272+
* @return {Client} - Returns itself
273+
*/
274+
275+
KafkaConsumer.prototype.incrementalAssign = function(assignments) {
276+
this._client.incrementalAssign(TopicPartition.map(assignments));
277+
return this;
278+
};
279+
267280
/**
268281
* Unassign the consumer from its assigned partitions and topics.
269282
*
@@ -275,6 +288,18 @@ KafkaConsumer.prototype.unassign = function() {
275288
return this;
276289
};
277290

291+
/**
292+
* Incremental unassign the consumer from specific partitions and topics
293+
*
294+
* @param {array} assignments - Assignments array. Should contain
295+
* objects with topic and partition set.
296+
* @return {Client} - Returns itself
297+
*/
298+
299+
KafkaConsumer.prototype.incrementalUnassign = function(assignments) {
300+
this._client.incrementalUnassign(TopicPartition.map(assignments));
301+
return this;
302+
};
278303

279304
/**
280305
* Get the assignments for the consumer

src/kafka-consumer.cc

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,31 @@ Baton KafkaConsumer::Assign(std::vector<RdKafka::TopicPartition*> partitions) {
184184
return Baton(errcode);
185185
}
186186

187+
Baton KafkaConsumer::IncrementalAssign(std::vector<RdKafka::TopicPartition*> partitions) {
188+
if (!IsConnected()) {
189+
return Baton(RdKafka::ERR__STATE, "KafkaConsumer is disconnected");
190+
}
191+
192+
RdKafka::KafkaConsumer* consumer =
193+
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
194+
195+
RdKafka::Error *e = consumer->incremental_assign(partitions);
196+
197+
if (e) {
198+
RdKafka::ErrorCode errcode = e->code();
199+
delete e;
200+
return Baton(errcode);
201+
}
202+
203+
m_partition_cnt += partitions.size();
204+
for (auto i = partitions.begin(); i != partitions.end(); ++i) {
205+
m_partitions.push_back(*i);
206+
}
207+
partitions.clear();
208+
209+
return Baton(RdKafka::ERR_NO_ERROR);
210+
}
211+
187212
Baton KafkaConsumer::Unassign() {
188213
if (!IsClosing() && !IsConnected()) {
189214
return Baton(RdKafka::ERR__STATE);
@@ -200,12 +225,41 @@ Baton KafkaConsumer::Unassign() {
200225

201226
// Destroy the old list of partitions since we are no longer using it
202227
RdKafka::TopicPartition::destroy(m_partitions);
228+
m_partitions.clear();
203229

204230
m_partition_cnt = 0;
205231

206232
return Baton(RdKafka::ERR_NO_ERROR);
207233
}
208234

235+
Baton KafkaConsumer::IncrementalUnassign(std::vector<RdKafka::TopicPartition*> partitions) {
236+
if (!IsClosing() && !IsConnected()) {
237+
return Baton(RdKafka::ERR__STATE);
238+
}
239+
240+
RdKafka::KafkaConsumer* consumer =
241+
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
242+
243+
RdKafka::Error *e = consumer->incremental_unassign(partitions);
244+
if (e) {
245+
RdKafka::ErrorCode errcode = e->code();
246+
delete e;
247+
return Baton(errcode);
248+
}
249+
250+
// Destroy the old list of partitions since we are no longer using it
251+
RdKafka::TopicPartition::destroy(partitions);
252+
253+
m_partitions.erase(
254+
std::remove_if(m_partitions.begin(), m_partitions.end(), [&partitions](RdKafka::TopicPartition *x) -> bool {
255+
return std::find(partitions.begin(), partitions.end(), x) != partitions.end();
256+
}),
257+
m_partitions.end()
258+
);
259+
m_partition_cnt -= partitions.size();
260+
return Baton(RdKafka::ERR_NO_ERROR);
261+
}
262+
209263
Baton KafkaConsumer::Commit(std::vector<RdKafka::TopicPartition*> toppars) {
210264
if (!IsConnected()) {
211265
return Baton(RdKafka::ERR__STATE);
@@ -532,7 +586,9 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
532586
Nan::SetPrototypeMethod(tpl, "committed", NodeCommitted);
533587
Nan::SetPrototypeMethod(tpl, "position", NodePosition);
534588
Nan::SetPrototypeMethod(tpl, "assign", NodeAssign);
589+
Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign);
535590
Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign);
591+
Nan::SetPrototypeMethod(tpl, "incrementalUnassign", NodeIncrementalUnassign);
536592
Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments);
537593

538594
Nan::SetPrototypeMethod(tpl, "commit", NodeCommit);
@@ -764,6 +820,64 @@ NAN_METHOD(KafkaConsumer::NodeAssign) {
764820
info.GetReturnValue().Set(Nan::True());
765821
}
766822

823+
NAN_METHOD(KafkaConsumer::NodeIncrementalAssign) {
824+
Nan::HandleScope scope;
825+
826+
if (info.Length() < 1 || !info[0]->IsArray()) {
827+
// Just throw an exception
828+
return Nan::ThrowError("Need to specify an array of partitions");
829+
}
830+
831+
v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
832+
std::vector<RdKafka::TopicPartition*> topic_partitions;
833+
834+
for (unsigned int i = 0; i < partitions->Length(); ++i) {
835+
v8::Local<v8::Value> partition_obj_value;
836+
if (!(
837+
Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
838+
partition_obj_value->IsObject())) {
839+
Nan::ThrowError("Must pass topic-partition objects");
840+
}
841+
842+
v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();
843+
844+
// Got the object
845+
int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
846+
std::string topic = GetParameter<std::string>(partition_obj, "topic", "");
847+
848+
if (!topic.empty()) {
849+
RdKafka::TopicPartition* part;
850+
851+
if (partition < 0) {
852+
part = Connection::GetPartition(topic);
853+
} else {
854+
part = Connection::GetPartition(topic, partition);
855+
}
856+
857+
// Set the default value to offset invalid. If provided, we will not set
858+
// the offset.
859+
int64_t offset = GetParameter<int64_t>(
860+
partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
861+
if (offset != RdKafka::Topic::OFFSET_INVALID) {
862+
part->set_offset(offset);
863+
}
864+
865+
topic_partitions.push_back(part);
866+
}
867+
}
868+
869+
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
870+
871+
// Hand over the partitions to the consumer.
872+
Baton b = consumer->IncrementalAssign(topic_partitions);
873+
874+
if (b.err() != RdKafka::ERR_NO_ERROR) {
875+
Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
876+
}
877+
878+
info.GetReturnValue().Set(Nan::True());
879+
}
880+
767881
NAN_METHOD(KafkaConsumer::NodeUnassign) {
768882
Nan::HandleScope scope;
769883

@@ -784,6 +898,63 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) {
784898
info.GetReturnValue().Set(Nan::True());
785899
}
786900

901+
NAN_METHOD(KafkaConsumer::NodeIncrementalUnassign) {
902+
Nan::HandleScope scope;
903+
904+
if (info.Length() < 1 || !info[0]->IsArray()) {
905+
// Just throw an exception
906+
return Nan::ThrowError("Need to specify an array of partitions");
907+
}
908+
909+
v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
910+
std::vector<RdKafka::TopicPartition*> topic_partitions;
911+
912+
for (unsigned int i = 0; i < partitions->Length(); ++i) {
913+
v8::Local<v8::Value> partition_obj_value;
914+
if (!(
915+
Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
916+
partition_obj_value->IsObject())) {
917+
Nan::ThrowError("Must pass topic-partition objects");
918+
}
919+
920+
v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();
921+
922+
// Got the object
923+
int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
924+
std::string topic = GetParameter<std::string>(partition_obj, "topic", "");
925+
926+
if (!topic.empty()) {
927+
RdKafka::TopicPartition* part;
928+
929+
if (partition < 0) {
930+
part = Connection::GetPartition(topic);
931+
} else {
932+
part = Connection::GetPartition(topic, partition);
933+
}
934+
935+
// Set the default value to offset invalid. If provided, we will not set
936+
// the offset.
937+
int64_t offset = GetParameter<int64_t>(
938+
partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
939+
if (offset != RdKafka::Topic::OFFSET_INVALID) {
940+
part->set_offset(offset);
941+
}
942+
943+
topic_partitions.push_back(part);
944+
}
945+
}
946+
947+
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
948+
// Hand over the partitions to the consumer.
949+
Baton b = consumer->IncrementalUnassign(topic_partitions);
950+
951+
if (b.err() != RdKafka::ERR_NO_ERROR) {
952+
Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
953+
}
954+
955+
info.GetReturnValue().Set(Nan::True());
956+
}
957+
787958
NAN_METHOD(KafkaConsumer::NodeUnsubscribe) {
788959
Nan::HandleScope scope;
789960

src/kafka-consumer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ class KafkaConsumer : public Connection {
7272
int AssignedPartitionCount();
7373

7474
Baton Assign(std::vector<RdKafka::TopicPartition*>);
75+
Baton IncrementalAssign(std::vector<RdKafka::TopicPartition*>);
7576
Baton Unassign();
77+
Baton IncrementalUnassign(std::vector<RdKafka::TopicPartition*>);
7678

7779
Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms);
7880

@@ -105,7 +107,9 @@ class KafkaConsumer : public Connection {
105107
static NAN_METHOD(NodeSubscribe);
106108
static NAN_METHOD(NodeDisconnect);
107109
static NAN_METHOD(NodeAssign);
110+
static NAN_METHOD(NodeIncrementalAssign);
108111
static NAN_METHOD(NodeUnassign);
112+
static NAN_METHOD(NodeIncrementalUnassign);
109113
static NAN_METHOD(NodeAssignments);
110114
static NAN_METHOD(NodeUnsubscribe);
111115
static NAN_METHOD(NodeCommit);

0 commit comments

Comments
 (0)