Skip to content

Commit 038ce84

Browse files
authored
bucket notification - ensure all lines in a persitent log are processed (gh issue 8653) (#8662)
Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com>
1 parent 02f26cb commit 038ce84

File tree

1 file changed

+25
-20
lines changed

1 file changed

+25
-20
lines changed

src/util/notifications_util.js

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -114,28 +114,33 @@ class Notificator {
114114
const file = new LogFile(fs_context, log_file);
115115
const send_promises = [];
116116
await file.collect_and_process(async str => {
117-
const notif = JSON.parse(str);
118-
dbg.log2("notifying with notification =", notif);
119-
let connect = this.notif_to_connect.get(notif.meta.name);
120-
if (!connect) {
121-
connect = await this.parse_connect_file(notif.meta.connect);
122-
this.notif_to_connect.set(notif.meta.name, connect);
123-
}
124-
let connection = this.connect_str_to_connection.get(notif.meta.name);
125-
if (!connection) {
126-
connection = get_connection(connect);
127-
try {
128-
await connection.connect();
129-
} catch (err) {
130-
//failed to connect
131-
dbg.error("Connection failed for", connect);
132-
await failure_append(str);
133-
return;
117+
try {
118+
const notif = JSON.parse(str);
119+
dbg.log2("notifying with notification =", notif);
120+
let connect = this.notif_to_connect.get(notif.meta.name);
121+
if (!connect) {
122+
connect = await this.parse_connect_file(notif.meta.connect);
123+
this.notif_to_connect.set(notif.meta.name, connect);
134124
}
135-
this.connect_str_to_connection.set(notif.meta.name, connection);
125+
let connection = this.connect_str_to_connection.get(notif.meta.name);
126+
if (!connection) {
127+
connection = get_connection(connect);
128+
try {
129+
await connection.connect();
130+
} catch (err) {
131+
//failed to connect
132+
dbg.error("Connection failed for", connect);
133+
await failure_append(str);
134+
return;
135+
}
136+
this.connect_str_to_connection.set(notif.meta.name, connection);
137+
}
138+
const send_promise = connection.promise_notify(notif, failure_append);
139+
if (send_promise) send_promises.push(send_promise);
140+
} catch (err) {
141+
dbg.error("Failed to notify. err = ", err, ", str =", str);
142+
await failure_append(str);
136143
}
137-
const send_promise = connection.promise_notify(notif, failure_append);
138-
if (send_promise) send_promises.push(send_promise);
139144
});
140145
//note we can't reject promises here, since Promise.all() is rejected on
141146
//first rejected promise, and that would not await other send_promises()

0 commit comments

Comments
 (0)