@@ -12,9 +12,11 @@ const nb_native = require('../util/nb_native');
12
12
const NsfsObjectSDK = require ( '../sdk/nsfs_object_sdk' ) ;
13
13
const native_fs_utils = require ( '../util/native_fs_utils' ) ;
14
14
const { NoobaaEvent } = require ( './manage_nsfs_events_utils' ) ;
15
+ const notifications_util = require ( '../util/notifications_util' ) ;
15
16
const ManageCLIError = require ( './manage_nsfs_cli_errors' ) . ManageCLIError ;
16
17
const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME ,
17
18
is_desired_time, record_current_time } = require ( './manage_nsfs_cli_utils' ) ;
19
+ const SensitiveString = require ( '../util/sensitive_string' ) ;
18
20
19
21
// TODO:
20
22
// implement
@@ -155,8 +157,9 @@ async function process_bucket(config_fs, bucket_name, system_json) {
155
157
const account = { email : '' , nsfs_account_config : config_fs . fs_context , access_keys : [ ] } ;
156
158
const object_sdk = new NsfsObjectSDK ( '' , config_fs , account , bucket_json . versioning , config_fs . config_root , system_json ) ;
157
159
await object_sdk . _simple_load_requesting_account ( ) ;
160
+ const should_notify = notifications_util . should_notify_on_event ( bucket_json , notifications_util . OP_TO_EVENT . lifecycle_delete . name ) ;
158
161
if ( ! bucket_json . lifecycle_configuration_rules ) return { } ;
159
- await process_rules ( config_fs , bucket_json , object_sdk ) ;
162
+ await process_rules ( config_fs , bucket_json , object_sdk , should_notify ) ;
160
163
}
161
164
162
165
/**
@@ -165,15 +168,15 @@ async function process_bucket(config_fs, bucket_name, system_json) {
165
168
* @param {Object } bucket_json
166
169
* @param {nb.ObjectSDK } object_sdk
167
170
*/
168
- async function process_rules ( config_fs , bucket_json , object_sdk ) {
171
+ async function process_rules ( config_fs , bucket_json , object_sdk , should_notify ) {
169
172
try {
170
173
await P . all ( _ . map ( bucket_json . lifecycle_configuration_rules ,
171
174
async ( lifecycle_rule , index ) =>
172
175
await _call_op_and_update_status ( {
173
176
bucket_name : bucket_json . name ,
174
177
rule_id : lifecycle_rule . id ,
175
178
op_name : TIMED_OPS . PROCESS_RULE ,
176
- op_func : async ( ) => process_rule ( config_fs , lifecycle_rule , index , bucket_json , object_sdk )
179
+ op_func : async ( ) => process_rule ( config_fs , lifecycle_rule , index , bucket_json , object_sdk , should_notify )
177
180
} )
178
181
)
179
182
) ;
@@ -182,6 +185,40 @@ async function process_rules(config_fs, bucket_json, object_sdk) {
182
185
}
183
186
}
184
187
188
+ async function send_lifecycle_notifications ( delete_res , bucket_json , object_sdk ) {
189
+ const writes = [ ] ;
190
+ for ( const deleted_obj of delete_res ) {
191
+ if ( delete_res . err_code ) continue ;
192
+ for ( const notif of bucket_json . notifications ) {
193
+ if ( notifications_util . check_notif_relevant ( notif , {
194
+ op_name : 'lifecycle_delete' ,
195
+ s3_event_method : deleted_obj . delete_marker_created ? 'DeleteMarkerCreated' : 'Delete' ,
196
+ } ) ) {
197
+ //remember that this deletion needs a notif for this specific notification conf
198
+ writes . push ( { notif, deleted_obj} ) ;
199
+ }
200
+ }
201
+ }
202
+
203
+ //required format by compose_notification_lifecycle
204
+ bucket_json . bucket_owner = new SensitiveString ( bucket_json . bucket_owner ) ;
205
+
206
+ //if any notifications are needed, write them in notification log file
207
+ //(otherwise don't do any unnecessary filesystem actions)
208
+ if ( writes . length > 0 ) {
209
+ let logger ;
210
+ try {
211
+ logger = notifications_util . get_notification_logger ( 'SHARED' ) ;
212
+ await P . map_with_concurrency ( 100 , writes , async write => {
213
+ const notif = notifications_util . compose_notification_lifecycle ( write . deleted_obj , write . notif , bucket_json , object_sdk ) ;
214
+ logger . append ( JSON . stringify ( notif ) ) ;
215
+ } ) ;
216
+ } finally {
217
+ if ( logger ) logger . close ( ) ;
218
+ }
219
+ }
220
+ }
221
+
185
222
/**
186
223
* process_rule processes the lifecycle rule for a bucket
187
224
* TODO - implement notifications for the deleted objects (check if needed for abort mpus as well)
@@ -192,7 +229,7 @@ async function process_rules(config_fs, bucket_json, object_sdk) {
192
229
* @param {nb.ObjectSDK } object_sdk
193
230
* @returns {Promise<Void> }
194
231
*/
195
- async function process_rule ( config_fs , lifecycle_rule , index , bucket_json , object_sdk ) {
232
+ async function process_rule ( config_fs , lifecycle_rule , index , bucket_json , object_sdk , should_notify ) {
196
233
dbg . log0 ( 'nc_lifecycle.process_rule: start bucket name:' , bucket_json . name , 'rule' , lifecycle_rule , 'index' , index ) ;
197
234
const bucket_name = bucket_json . name ;
198
235
const rule_id = lifecycle_rule . id ;
@@ -209,7 +246,7 @@ async function process_rule(config_fs, lifecycle_rule, index, bucket_json, objec
209
246
} ) ;
210
247
211
248
if ( candidates . delete_candidates ?. length > 0 ) {
212
- await _call_op_and_update_status ( {
249
+ const delete_res = await _call_op_and_update_status ( {
213
250
bucket_name,
214
251
rule_id,
215
252
op_name : TIMED_OPS . DELETE_MULTIPLE_OBJECTS ,
@@ -218,6 +255,9 @@ async function process_rule(config_fs, lifecycle_rule, index, bucket_json, objec
218
255
objects : candidates . delete_candidates
219
256
} )
220
257
} ) ;
258
+ if ( should_notify ) {
259
+ await send_lifecycle_notifications ( delete_res , bucket_json , object_sdk ) ;
260
+ }
221
261
}
222
262
223
263
if ( candidates . abort_mpu_candidates ?. length > 0 ) {
0 commit comments