@@ -40,6 +40,7 @@ const TIMED_OPS = Object.freeze({
40
40
RUN_LIFECYLE : 'run_lifecycle' ,
41
41
LIST_BUCKETS : 'list_buckets' ,
42
42
CREATE_GPFS_CANDIDATES_FILES : 'create_gpfs_candidates_files' ,
43
+ CREATE_GPFS_CANDIDATE_FILE_BY_ILM_POLICY : 'create_candidates_file_by_gpfs_ilm_policy' ,
43
44
PROCESS_BUCKETS : 'process_buckets' ,
44
45
PROCESS_BUCKET : 'process_bucket' ,
45
46
PROCESS_RULE : 'process_rule' ,
@@ -82,7 +83,8 @@ class NCLifecycle {
82
83
lifecycle_run_times : { } ,
83
84
total_stats : this . _get_default_stats ( ) ,
84
85
state : { is_finished : false } ,
85
- buckets_statuses : { }
86
+ buckets_statuses : { } ,
87
+ mount_points_statuses : { }
86
88
} ;
87
89
this . return_short_status = options . short_status || false ;
88
90
this . disable_service_validation = options . disable_service_validation || false ;
@@ -787,14 +789,15 @@ class NCLifecycle {
787
789
* @param {{
788
790
* op_name: string;
789
791
* op_func: () => Promise<T>;
792
+ * mount_point?: string,
790
793
* bucket_name?: string,
791
794
* rule_id?: string
792
795
* }} params
793
796
* @returns {Promise<T> }
794
797
*/
795
- async _call_op_and_update_status ( { bucket_name = undefined , rule_id = undefined , op_name, op_func } ) {
798
+ async _call_op_and_update_status ( { mount_point = undefined , bucket_name = undefined , rule_id = undefined , op_name, op_func } ) {
796
799
const start_time = Date . now ( ) ;
797
- const update_options = { op_name, bucket_name, rule_id } ;
800
+ const update_options = { mount_point , op_name, bucket_name, rule_id } ;
798
801
let end_time ;
799
802
let took_ms ;
800
803
let error ;
@@ -815,13 +818,15 @@ class NCLifecycle {
815
818
}
816
819
817
820
/**
818
- * update_status updates rule/bucket/global based on the given parameters
821
+ * update_status updates rule/bucket/mount_point/ global based on the given parameters
819
822
* 1. initalize statuses/times/stats per level
820
823
* 2. update times
821
824
* 3. update errors
822
825
* 4. update stats if the op is at rule level
826
+ * Note - on mount_point we won't update stats/state
823
827
* @param {{
824
828
* op_name: string,
829
+ * mount_point?: string,
825
830
* bucket_name?: string,
826
831
* rule_id?: string,
827
832
* op_times: { start_time?: number, end_time?: number, took_ms?: number },
@@ -830,21 +835,23 @@ class NCLifecycle {
830
835
* } params
831
836
* @returns {Void }
832
837
*/
833
- update_status ( { bucket_name, rule_id, op_name, op_times, reply = [ ] , error = undefined } ) {
838
+ update_status ( { mount_point , bucket_name, rule_id, op_name, op_times, reply = [ ] , error = undefined } ) {
834
839
if ( op_times . start_time ) {
835
840
if ( op_name === TIMED_OPS . PROCESS_RULE ) {
836
841
this . init_rule_status ( bucket_name , rule_id ) ;
837
842
} else if ( op_name === TIMED_OPS . PROCESS_BUCKET ) {
838
843
this . init_bucket_status ( bucket_name ) ;
844
+ } else if ( op_name === TIMED_OPS . CREATE_GPFS_CANDIDATE_FILE_BY_ILM_POLICY ) {
845
+ this . init_mount_status ( mount_point ) ;
839
846
}
840
847
}
841
848
if ( op_times . end_time ) {
842
849
if ( op_name === TIMED_OPS . PROCESS_RULE ) {
843
850
this . update_rule_status_is_finished ( bucket_name , rule_id ) ;
844
851
}
845
852
}
846
- this . _update_times_on_status ( { op_name, op_times, bucket_name, rule_id } ) ;
847
- this . _update_error_on_status ( { error, bucket_name, rule_id } ) ;
853
+ this . _update_times_on_status ( { op_name, op_times, mount_point , bucket_name, rule_id } ) ;
854
+ this . _update_error_on_status ( { error, mount_point , bucket_name, rule_id } ) ;
848
855
if ( bucket_name && rule_id ) {
849
856
this . update_stats_on_status ( { bucket_name, rule_id, op_name, op_times, reply } ) ;
850
857
}
@@ -918,16 +925,18 @@ class NCLifecycle {
918
925
/**
919
926
* _update_times_on_status updates start/end & took times in lifecycle status
920
927
* @param {{op_name: String, op_times: {start_time?: number, end_time?: number, took_ms?: number },
921
- * bucket_name?: String, rule_id?: String }} params
928
+ * mount_point ?: String, bucket_name?: String, rule_id?: String }} params
922
929
* @returns
923
930
*/
924
- _update_times_on_status ( { op_name, op_times, bucket_name = undefined , rule_id = undefined } ) {
931
+ _update_times_on_status ( { op_name, op_times, mount_point = undefined , bucket_name = undefined , rule_id = undefined } ) {
925
932
for ( const [ key , value ] of Object . entries ( op_times ) ) {
926
933
const status_key = op_name + '_' + key ;
927
934
if ( bucket_name && rule_id ) {
928
935
this . lifecycle_run_status . buckets_statuses [ bucket_name ] . rules_statuses [ rule_id ] . rule_process_times [ status_key ] = value ;
929
936
} else if ( bucket_name ) {
930
937
this . lifecycle_run_status . buckets_statuses [ bucket_name ] . bucket_process_times [ status_key ] = value ;
938
+ } else if ( mount_point ) {
939
+ this . lifecycle_run_status . mount_points_statuses [ mount_point ] . mount_point_process_times [ status_key ] = value ;
931
940
} else {
932
941
this . lifecycle_run_status . lifecycle_run_times [ status_key ] = value ;
933
942
}
@@ -936,15 +945,17 @@ class NCLifecycle {
936
945
937
946
/**
938
947
* _update_error_on_status updates an error occured in lifecycle status
939
- * @param {{error: Error, bucket_name?: string, rule_id?: string} } params
948
+ * @param {{error: Error, mount_point?: string, bucket_name?: string, rule_id?: string} } params
940
949
* @returns
941
950
*/
942
- _update_error_on_status ( { error, bucket_name = undefined , rule_id = undefined } ) {
951
+ _update_error_on_status ( { error, mount_point = undefined , bucket_name = undefined , rule_id = undefined } ) {
943
952
if ( ! error ) return ;
944
953
if ( bucket_name && rule_id ) {
945
954
( this . lifecycle_run_status . buckets_statuses [ bucket_name ] . rules_statuses [ rule_id ] . errors ??= [ ] ) . push ( error . message ) ;
946
955
} else if ( bucket_name ) {
947
956
( this . lifecycle_run_status . buckets_statuses [ bucket_name ] . errors ??= [ ] ) . push ( error . message ) ;
957
+ } else if ( mount_point ) {
958
+ ( this . lifecycle_run_status . mount_points_statuses [ mount_point ] . errors ??= [ ] ) . push ( error . message ) ;
948
959
} else {
949
960
( this . lifecycle_run_status . errors ??= [ ] ) . push ( error . message ) ;
950
961
}
@@ -985,6 +996,18 @@ class NCLifecycle {
985
996
return this . lifecycle_run_status . buckets_statuses [ bucket_name ] ;
986
997
}
987
998
999
+ /**
1000
+ * init the mount_point status object statuses if they don't exist
1001
+ * @param {string } mount_point
1002
+ * @returns {Object } created or existing mount_point status
1003
+ */
1004
+ init_mount_status ( mount_point ) {
1005
+ this . lifecycle_run_status . mount_points_statuses [ mount_point ] ??= { } ;
1006
+ this . lifecycle_run_status . mount_points_statuses [ mount_point ] . mount_point_process_times ??= { } ;
1007
+ return this . lifecycle_run_status . mount_points_statuses [ mount_point ] ;
1008
+ }
1009
+
1010
+
988
1011
/**
989
1012
* init the rule status object statuses if they don't exist
990
1013
* @param {string } bucket_name
@@ -1213,11 +1236,22 @@ class NCLifecycle {
1213
1236
1214
1237
await native_fs_utils . _create_path ( ILM_POLICIES_TMP_DIR , this . non_gpfs_fs_context , config . BASE_MODE_CONFIG_DIR ) ;
1215
1238
await native_fs_utils . _create_path ( ILM_CANDIDATES_TMP_DIR , this . non_gpfs_fs_context , config . BASE_MODE_CONFIG_DIR ) ;
1216
- for ( const [ mount_point , policy ] of Object . entries ( mount_point_to_policy_map ) ) {
1217
- if ( policy === '' ) continue ;
1218
- const ilm_policy_path = await this . write_tmp_ilm_policy ( mount_point , policy ) ;
1219
- await this . create_candidates_file_by_gpfs_ilm_policy ( mount_point , ilm_policy_path ) ;
1220
- }
1239
+ await P . map_with_concurrency ( config . NC_LIFECYCLE_GPFS_MMAPPLY_ILM_POLICY_CONCURRENCY ,
1240
+ Object . entries ( mount_point_to_policy_map ) , async ( [ mount_point , policy ] ) => {
1241
+ try {
1242
+ if ( policy === '' ) return ;
1243
+ await this . _call_op_and_update_status ( {
1244
+ mount_point,
1245
+ op_name : TIMED_OPS . CREATE_GPFS_CANDIDATE_FILE_BY_ILM_POLICY ,
1246
+ op_func : async ( ) => {
1247
+ const ilm_policy_path = await this . write_tmp_ilm_policy ( mount_point , policy ) ;
1248
+ await this . create_candidates_file_by_gpfs_ilm_policy ( mount_point , ilm_policy_path ) ;
1249
+ }
1250
+ } ) ;
1251
+ } catch ( err ) {
1252
+ dbg . error ( 'create_candidates_file_by_gpfs_ilm_policy failed with error' , err , err . code , err . message ) ;
1253
+ }
1254
+ } ) ;
1221
1255
}
1222
1256
1223
1257
/**
0 commit comments