Skip to content

Commit 460592e

Browse files
authored
bucket notification - introduce batch param (#8685)
Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com>
1 parent 0d2edbc commit 460592e

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,7 @@ config.BUCKET_LOG_CONCURRENCY = 10;
712712
////////////////////////////////
713713
config.NOTIFICATION_LOG_NS = 'notification_logging';
714714
config.NOTIFICATION_LOG_DIR = process.env.NOTIFICATION_LOG_DIR;
715+
config.NOTIFICATION_BATCH = process.env.BATCH || 10;
715716

716717
///////////////////////////
717718
// KEY ROTATOR //

src/util/notifications_util.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,14 @@ class Notificator {
3535
* @param {Object} options
3636
*/
3737

38-
constructor({name, fs_context, connect_files_dir, nc_config_fs}) {
38+
constructor({name, fs_context, connect_files_dir, nc_config_fs, batch_size}) {
3939
this.name = name;
4040
this.connect_str_to_connection = new Map();
4141
this.notif_to_connect = new Map();
4242
this.fs_context = fs_context ?? get_process_fs_context();
4343
this.connect_files_dir = connect_files_dir ?? DEFAULT_CONNECT_FILES_DIR;
4444
this.nc_config_fs = nc_config_fs;
45+
this.batch_size = batch_size || config.NOTIFICATION_BATCH || 10;
4546
}
4647

4748
async run_batch() {
@@ -112,7 +113,7 @@ class Notificator {
112113
*/
113114
async _notify(fs_context, log_file, failure_append) {
114115
const file = new LogFile(fs_context, log_file);
115-
const send_promises = [];
116+
let send_promises = [];
116117
await file.collect_and_process(async str => {
117118
try {
118119
const notif = JSON.parse(str);
@@ -137,6 +138,10 @@ class Notificator {
137138
}
138139
const send_promise = connection.promise_notify(notif, failure_append);
139140
if (send_promise) send_promises.push(send_promise);
141+
if (send_promises.length > this.batch_size) {
142+
await Promise.all(send_promises);
143+
send_promises = [];
144+
}
140145
} catch (err) {
141146
dbg.error("Failed to notify. err = ", err, ", str =", str);
142147
await failure_append(str);

0 commit comments

Comments
 (0)