Skip to content

Commit bb57299

Browse files
authored
bucket notifications - clear connections map on cleanup (dfs 2834). also add logs, bump version (#9111)
Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com>
1 parent 501162c commit bb57299

File tree

3 files changed

+22
-10
lines changed

3 files changed

+22
-10
lines changed

package-lock.json

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
"morgan": "1.10.0",
108108
"nan": "2.22.2",
109109
"node-addon-api": "8.3.1",
110-
"node-rdkafka": "3.4.0",
110+
"node-rdkafka": "3.4.1",
111111
"performance-now": "2.1.0",
112112
"pg": "8.16.0",
113113
"ping": "0.4.4",

src/util/notifications_util.js

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,11 @@ class Notificator {
103103
throw err;
104104
} finally {
105105
await log.close();
106-
this.notif_to_connect.clear();
107106
for (const conn of this.connect_str_to_connection.values()) {
108107
conn.destroy();
109108
}
109+
this.connect_str_to_connection.clear();
110+
this.notif_to_connect.clear();
110111
}
111112
}
112113
}
@@ -272,18 +273,25 @@ class KafkaNotificator {
272273

273274
async connect() {
274275
this.connection = new Kafka.HighLevelProducer(this.connect_obj.kafka_options_object);
276+
dbg.log2("Kafka producer connecting, connect =", this.connect_obj);
275277
await new Promise((res, rej) => {
276278
this.connection.on('ready', () => {
279+
dbg.log2("Kafka producer connected for connection =", this.connect_obj);
277280
res();
278281
});
279282
this.connection.on('connection.failure', err => {
283+
dbg.error("Kafka producer failed to connect. connect = ", this.connect_obj, ", err =", err);
280284
rej(err);
281285
});
282286
this.connection.on('event.log', arg => {
283-
dbg.log1("event log", arg);
287+
dbg.log2("event log", arg);
288+
});
289+
this.connection.on('event.error', arg => {
290+
dbg.error("event error =", arg);
284291
});
285292
this.connection.connect();
286293
});
294+
dbg.log2("Kafka producer's connect done, connect =", this.connect_obj);
287295
this.connection.setPollInterval(100);
288296
}
289297

@@ -296,10 +304,12 @@ class KafkaNotificator {
296304
Buffer.from(JSON.stringify(notif.notif)),
297305
null,
298306
Date.now(),
299-
(err, offset) => {
307+
err => {
300308
if (err) {
309+
dbg.error("Failed to notify. Connect =", connect_obj, ", notif =", notif);
301310
promise_failure_cb(notif, failure_ctxt, err).then(resolve);
302311
} else {
312+
dbg.log2("Kafka notify successful. Connect =", connect_obj, ", notif =", notif);
303313
resolve();
304314
}
305315
}
@@ -308,8 +318,10 @@ class KafkaNotificator {
308318
}
309319

310320
destroy() {
311-
this.connection.flush(10000);
312-
this.connection.disconnect();
321+
if (this.connection.isConnected()) {
322+
this.connection.flush(10000);
323+
this.connection.disconnect();
324+
}
313325
}
314326
}
315327

0 commit comments

Comments
 (0)