Skip to content

Commit 464e7b7

Browse files
authored
Merge pull request #8862 from nadavMiz/lifecycle_expire
NC | lifecycle - add expire rule
2 parents 3c2c68b + 4c423f9 commit 464e7b7

File tree

3 files changed

+317
-94
lines changed

3 files changed

+317
-94
lines changed

src/manage_nsfs/nc_lifecycle.js

Lines changed: 108 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const { write_stdout_response, throw_cli_error, get_service_status, NOOBAA_SERVI
1818
is_desired_time, record_current_time } = require('./manage_nsfs_cli_utils');
1919

2020
// TODO:
21-
// implement
21+
// implement
2222
// 1. notifications
2323
// 2. POSIX scanning and filtering per rule
2424
// 3. GPFS ILM policy and apply for scanning and filtering optimization
@@ -49,7 +49,7 @@ const TIMED_OPS = Object.freeze({
4949
/**
5050
* run_lifecycle_under_lock runs the lifecycle workflow under a file system lock
5151
* lifecycle workflow is being locked to prevent multiple instances from running the lifecycle workflow
52-
* @param {import('../sdk/config_fs').ConfigFS} config_fs
52+
* @param {import('../sdk/config_fs').ConfigFS} config_fs
5353
* @param {{disable_service_validation?: boolean, disable_runtime_validation?: boolean, short?: boolean}} flags
5454
*/
5555
async function run_lifecycle_under_lock(config_fs, flags) {
@@ -83,9 +83,9 @@ async function run_lifecycle_under_lock(config_fs, flags) {
8383
}
8484

8585
/**
86-
* run_lifecycle_or_timeout runs the lifecycle workflow or times out while calculating
86+
* run_lifecycle_or_timeout runs the lifecycle workflow or times out while calculating
8787
* and saving times and stats of the run on the global lifecycle status
88-
* @param {import('../sdk/config_fs').ConfigFS} config_fs
88+
* @param {import('../sdk/config_fs').ConfigFS} config_fs
8989
* @param {boolean} disable_service_validation
9090
* @returns {Promise<Void>}
9191
*/
@@ -104,7 +104,7 @@ async function run_lifecycle_or_timeout(config_fs, disable_service_validation) {
104104

105105
/**
106106
* run_lifecycle runs the lifecycle workflow
107-
* @param {import('../sdk/config_fs').ConfigFS} config_fs
107+
* @param {import('../sdk/config_fs').ConfigFS} config_fs
108108
* @param {boolean} disable_service_validation
109109
* @returns {Promise<Void>}
110110
*/
@@ -125,13 +125,13 @@ async function run_lifecycle(config_fs, disable_service_validation) {
125125

126126
/**
127127
* process_buckets iterates over buckets and handles their rules
128-
* @param {import('../sdk/config_fs').ConfigFS} config_fs
129-
* @param {String[]} bucket_names
128+
* @param {import('../sdk/config_fs').ConfigFS} config_fs
129+
* @param {String[]} bucket_names
130130
* @param {Object} system_json
131131
* @returns {Promise<Void>}
132132
*/
133133
async function process_buckets(config_fs, bucket_names, system_json) {
134-
const buckets_concurrency = 10; // TODO - think about it
134+
const buckets_concurrency = 10; // TODO - think about it
135135
await P.map_with_concurrency(buckets_concurrency, bucket_names, async bucket_name =>
136136
await _call_op_and_update_status({
137137
bucket_name,
@@ -182,10 +182,10 @@ async function process_rules(config_fs, bucket_json, object_sdk) {
182182
/**
183183
* process_rule processes the lifecycle rule for a bucket
184184
* TODO - implement notifications for the deleted objects (check if needed for abort mpus as well)
185-
* @param {import('../sdk/config_fs').ConfigFS} config_fs
186-
* @param {Object} lifecycle_rule
187-
* @param {number} index
188-
* @param {Object} bucket_json
185+
* @param {import('../sdk/config_fs').ConfigFS} config_fs
186+
* @param {Object} lifecycle_rule
187+
* @param {number} index
188+
* @param {Object} bucket_json
189189
* @param {nb.ObjectSDK} object_sdk
190190
* @returns {Promise<Void>}
191191
*/
@@ -233,14 +233,14 @@ async function process_rule(config_fs, lifecycle_rule, index, bucket_json, objec
233233

234234
/**
235235
* abort_mpus iterates over the abort mpu candidates and calls abort_object_upload
236-
* since abort_object_upload is not returning anything, we catch it in case of an error and assign err_code
236+
* since abort_object_upload is not returning anything, we catch it in case of an error and assign err_code
237237
* so it can be translated to error on stats
238238
* @param {*} candidates
239-
* @param {nb.ObjectSDK} object_sdk
239+
* @param {nb.ObjectSDK} object_sdk
240240
* @returns {Promise<Object[]>}
241241
*/
242242
async function abort_mpus(candidates, object_sdk) {
243-
const aborts_concurrency = 10; // TODO - think about it
243+
const aborts_concurrency = 10; // TODO - think about it
244244
const abort_mpus_reply = await P.map_with_concurrency(aborts_concurrency, candidates.abort_mpu_candidates,
245245
async candidate => {
246246
const candidate_info = { key: candidate.key, upload_id: candidate.obj_id };
@@ -261,13 +261,13 @@ async function abort_mpus(candidates, object_sdk) {
261261
/////////////////////////////////
262262

263263
/**
264-
* _should_lifecycle_run checks if lifecycle worker should run based on the followings -
265-
* 1. lifecycle workrer can be disabled
264+
* _should_lifecycle_run checks if lifecycle worker should run based on the followings -
265+
* 1. lifecycle workrer can be disabled
266266
* 2. lifecycle worker might run at time that does not match config.NC_LIFECYCLE_RUN_TIME
267267
* 3. previous run was in the delay time frame
268-
* @param {nb.NativeFSContext} fs_context
269-
* @param {String} lifecycle_timestamp_file_path
270-
* @param {Boolean} disable_runtime_validation
268+
* @param {nb.NativeFSContext} fs_context
269+
* @param {String} lifecycle_timestamp_file_path
270+
* @param {Boolean} disable_runtime_validation
271271
* @returns {Promise<Boolean>}
272272
*/
273273
async function _should_lifecycle_run(fs_context, lifecycle_timestamp_file_path, disable_runtime_validation) {
@@ -284,7 +284,7 @@ async function _should_lifecycle_run(fs_context, lifecycle_timestamp_file_path,
284284

285285
/**
286286
* throw_if_noobaa_not_active checks if system.json exists and the noobaa service is active
287-
* @param {import('../sdk/config_fs').ConfigFS} config_fs
287+
* @param {import('../sdk/config_fs').ConfigFS} config_fs
288288
* @param {Object} system_json
289289
*/
290290
async function throw_if_noobaa_not_active(config_fs, system_json) {
@@ -308,7 +308,7 @@ async function throw_if_noobaa_not_active(config_fs, system_json) {
308308
async function get_candidates(bucket_json, lifecycle_rule, object_sdk, fs_context) {
309309
const candidates = { abort_mpu_candidates: [], delete_candidates: [] };
310310
if (lifecycle_rule.expiration) {
311-
candidates.delete_candidates = await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json);
311+
candidates.delete_candidates = await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json, object_sdk);
312312
if (lifecycle_rule.expiration.days || lifecycle_rule.expiration.expired_object_delete_marker) {
313313
const dm_candidates = await get_candidates_by_expiration_delete_marker_rule(lifecycle_rule, bucket_json);
314314
candidates.delete_candidates = candidates.delete_candidates.concat(dm_candidates);
@@ -325,10 +325,11 @@ async function get_candidates(bucket_json, lifecycle_rule, object_sdk, fs_contex
325325
return candidates;
326326
}
327327

328+
328329
/**
329330
* validate_rule_enabled checks if the rule is enabled and should be processed
330-
* @param {*} rule
331-
* @param {Object} bucket
331+
* @param {*} rule
332+
* @param {Object} bucket
332333
* @returns {boolean}
333334
*/
334335
function validate_rule_enabled(rule, bucket) {
@@ -348,25 +349,37 @@ function validate_rule_enabled(rule, bucket) {
348349
//////// EXPIRATION HELPERS ////////
349350
////////////////////////////////////
350351

352+
/**
353+
* @param {Object} entry list object entry
354+
*/
355+
function _get_lifecycle_object_info_from_list_object_entry(entry) {
356+
return {
357+
key: entry.key,
358+
age: _get_file_age_days(entry.create_time),
359+
size: entry.size,
360+
tags: entry.tagging,
361+
};
362+
}
363+
351364
/**
352365
* get_candidates_by_expiration_rule processes the expiration rule
353-
* @param {*} lifecycle_rule
354-
* @param {Object} bucket_json
366+
* @param {*} lifecycle_rule
367+
* @param {Object} bucket_json
355368
* @returns {Promise<Object[]>}
356369
*/
357-
async function get_candidates_by_expiration_rule(lifecycle_rule, bucket_json) {
370+
async function get_candidates_by_expiration_rule(lifecycle_rule, bucket_json, object_sdk) {
358371
const is_gpfs = nb_native().fs.gpfs;
359372
if (is_gpfs) {
360373
return get_candidates_by_expiration_rule_gpfs(lifecycle_rule, bucket_json);
361374
} else {
362-
return get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json);
375+
return get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json, object_sdk);
363376
}
364377
}
365378

366379
/**
367-
*
368-
* @param {*} lifecycle_rule
369-
* @param {Object} bucket_json
380+
*
381+
* @param {*} lifecycle_rule
382+
* @param {Object} bucket_json
370383
* @returns {Promise<Object[]>}
371384
*/
372385
async function get_candidates_by_expiration_rule_gpfs(lifecycle_rule, bucket_json) {
@@ -375,20 +388,34 @@ async function get_candidates_by_expiration_rule_gpfs(lifecycle_rule, bucket_jso
375388
}
376389

377390
/**
378-
*
379-
* @param {*} lifecycle_rule
391+
*
392+
* @param {*} lifecycle_rule
380393
* @param {Object} bucket_json
381-
* @returns {Promise<Object[]>}
394+
* @returns {Promise<Object[]>}
382395
*/
383-
async function get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json) {
384-
// TODO - implement
385-
return [];
396+
async function get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json, object_sdk) {
397+
const expiration = _get_expiration_time(lifecycle_rule.expiration);
398+
if (expiration < 0) return [];
399+
const filter_func = _build_lifecycle_filter({filter: lifecycle_rule.filter, expiration});
400+
401+
const filtered_objects = [];
402+
// TODO list_objects does not accept a filter and works in batch sizes of 1000. should handle batching
403+
// also should maybe create a helper function or add argument for a filter in list object
404+
const objects_list = await object_sdk.list_objects({bucket: bucket_json.name, prefix: lifecycle_rule.filter?.prefix});
405+
objects_list.objects.forEach(obj => {
406+
const object_info = _get_lifecycle_object_info_from_list_object_entry(obj);
407+
if (filter_func(object_info)) {
408+
filtered_objects.push({key: object_info.key});
409+
}
410+
});
411+
return filtered_objects;
412+
386413
}
387414

388415
/**
389416
* get_candidates_by_expiration_delete_marker_rule processes the expiration delete marker rule
390-
* @param {*} lifecycle_rule
391-
* @param {Object} bucket_json
417+
* @param {*} lifecycle_rule
418+
* @param {Object} bucket_json
392419
* @returns {Promise<Object[]>}
393420
*/
394421
async function get_candidates_by_expiration_delete_marker_rule(lifecycle_rule, bucket_json) {
@@ -462,13 +489,13 @@ async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycl
462489
}
463490

464491
/**
465-
* @param {*} create_params_parsed
492+
* @param {Object} create_params_parsed
466493
* @param {nb.NativeFSStats} stat
467494
*/
468495
function _get_lifecycle_object_info_for_mpu(create_params_parsed, stat) {
469496
return {
470497
key: create_params_parsed.key,
471-
age: _get_file_age_days(stat),
498+
age: _get_file_age_days(stat.mtime.getTime()),
472499
tags: create_params_parsed.tagging,
473500
};
474501
}
@@ -502,13 +529,32 @@ function _build_lifecycle_filter(params) {
502529

503530
/**
504531
* get file time since last modified in days
505-
* @param {nb.NativeFSStats} stat
532+
* @param {Number} mtime
533+
* @returns {Number} days since object was last modified
506534
*/
507-
function _get_file_age_days(stat) {
508-
//TODO how much do we care about rounding errors? (it is by days after all)
509-
return (Date.now() - Number(stat.mtimeNsBigint) / 1e6) / 24 / 60 / 60 / 1000;
535+
function _get_file_age_days(mtime) {
536+
return Math.floor((Date.now() - mtime) / 24 / 60 / 60 / 1000);
510537
}
511538

539+
/**
540+
* get the expiration time in days of an object
541+
* if rule is set with date, then rule is applied for all objects after that date
542+
* return -1 to indicate that the date hasn't arrived, so rule should not be applied
543+
* return 0 in case date has arrived so expiration is true for all elements
544+
* return days in case days was defined and not date
545+
* @param {Object} expiration_rule
546+
* @returns {Number}
547+
*/
548+
function _get_expiration_time(expiration_rule) {
549+
if (expiration_rule.date) {
550+
const expiration_date = new Date(expiration_rule.date).getTime();
551+
if (Date.now() < expiration_date) return -1;
552+
return 0;
553+
}
554+
return expiration_rule.days;
555+
}
556+
557+
512558
/**
513559
* checks if tag query_tag is in the list tag_set
514560
* @param {Object} query_tag
@@ -545,7 +591,7 @@ function _file_contain_tags(object_info, filter_tags) {
545591
* update_lifecycle_rules_last_sync updates the last sync time of the lifecycle rule
546592
* @param {import('../sdk/config_fs').ConfigFS} config_fs
547593
* @param {Object} bucket_json
548-
* @param {String} rule_id
594+
* @param {String} rule_id
549595
* @param {number} index
550596
* @returns {Promise<Void>}
551597
*/
@@ -601,14 +647,14 @@ async function _call_op_and_update_status({ bucket_name = undefined, rule_id = u
601647
* 2. update times
602648
* 3. update errors
603649
* 4. update stats if the op is at rule level
604-
* @param {{
650+
* @param {{
605651
* op_name: string,
606652
* bucket_name?: string,
607-
* rule_id?: string,
653+
* rule_id?: string,
608654
* op_times: { start_time?: number, end_time?: number, took_ms?: number },
609655
* reply?: Object[],
610656
* error?: Error}
611-
* } params
657+
* } params
612658
* @returns {Void}
613659
*/
614660
function update_status({ bucket_name, rule_id, op_name, op_times, reply = [], error = undefined }) {
@@ -629,7 +675,7 @@ function update_status({ bucket_name, rule_id, op_name, op_times, reply = [], er
629675

630676
/**
631677
* _calc_stats accumulates stats for global/bucket stats
632-
* @param {Object} stats_acc
678+
* @param {Object} stats_acc
633679
* @param {Object} [cur_op_stats]
634680
* @returns {Object}
635681
*/
@@ -649,17 +695,17 @@ function _acc_stats(stats_acc, cur_op_stats = {}) {
649695

650696
/**
651697
* update_stats_on_status updates stats on rule context status and adds the rule status to the summarized bucket/global context stats
652-
* @param {{
653-
* op_name: string,
654-
* bucket_name: string,
655-
* rule_id: string,
656-
* op_times: {
657-
* start_time?: number,
658-
* end_time?: number,
659-
* took_ms?: number
698+
* @param {{
699+
* op_name: string,
700+
* bucket_name: string,
701+
* rule_id: string,
702+
* op_times: {
703+
* start_time?: number,
704+
* end_time?: number,
705+
* took_ms?: number
660706
* },
661707
* reply?: Object[],
662-
* }} params
708+
* }} params
663709
* @returns {Void}
664710
*/
665711
function update_stats_on_status({ bucket_name, rule_id, op_name, op_times, reply = [] }) {
@@ -689,9 +735,9 @@ function update_stats_on_status({ bucket_name, rule_id, op_name, op_times, reply
689735

690736
/**
691737
* _update_times_on_status updates start/end & took times in lifecycle status
692-
* @param {{op_name: String, op_times: {start_time?: number, end_time?: number, took_ms?: number },
738+
* @param {{op_name: String, op_times: {start_time?: number, end_time?: number, took_ms?: number },
693739
* bucket_name?: String, rule_id?: String}} params
694-
* @returns
740+
* @returns
695741
*/
696742
function _update_times_on_status({ op_name, op_times, bucket_name = undefined, rule_id = undefined }) {
697743
for (const [key, value] of Object.entries(op_times)) {
@@ -709,7 +755,7 @@ function _update_times_on_status({ op_name, op_times, bucket_name = undefined, r
709755
/**
710756
* _update_error_on_status updates an error occured in lifecycle status
711757
* @param {{error: Error, bucket_name?: string, rule_id?: string}} params
712-
* @returns
758+
* @returns
713759
*/
714760
function _update_error_on_status({ error, bucket_name = undefined, rule_id = undefined }) {
715761
if (!error) return;

0 commit comments

Comments
 (0)