@@ -12,6 +12,7 @@ const { v4: uuidv4 } = require('uuid');
12
12
const P = require ( '../util/promise' ) ;
13
13
const dbg = require ( '../util/debug_module' ) ( __filename ) ;
14
14
const config = require ( '../../config' ) ;
15
+ const crypto = require ( 'crypto' ) ;
15
16
const s3_utils = require ( '../endpoint/s3/s3_utils' ) ;
16
17
const error_utils = require ( '../util/error_utils' ) ;
17
18
const stream_utils = require ( '../util/stream_utils' ) ;
@@ -293,6 +294,16 @@ function to_fs_xattr(xattr) {
293
294
return _ . mapKeys ( xattr , ( val , key ) => XATTR_USER_PREFIX + key ) ;
294
295
}
295
296
297
+ /**
298
+ * get_random_delay returns a random delay number between base + min and max
299
+ * @param {number } base
300
+ * @param {number } min
301
+ * @param {number } max
302
+ * @returns {number }
303
+ */
304
+ function get_random_delay ( base , min , max ) {
305
+ return base + crypto . randomInt ( min , max ) ;
306
+ }
296
307
297
308
/**
298
309
* @typedef {{
@@ -1391,9 +1402,10 @@ class NamespaceFS {
1391
1402
}
1392
1403
1393
1404
// 1. get latest version_id
1394
- // 2. if versioning is suspended -
1405
+ // 2. if versioning is suspended -
1395
1406
// 2.1 if version ID of the latest version is null -
1396
- // 2.1.1 remove latest version
1407
+ // 2.1.1. if it's POSIX backend - unlink the null version
1408
+ // 2.1.2. if it's GPFS backend - nothing to do, the linkatif will override it
1397
1409
// 2.2 else (version ID of the latest version is unique or there is no latest version) -
1398
1410
// 2.2.1 remove a version (or delete marker) with null version ID from .versions/ (if exists)
1399
1411
// 3. if latest version exists -
@@ -1417,10 +1429,8 @@ class NamespaceFS {
1417
1429
const versioned_path = latest_ver_info && this . _get_version_path ( key , latest_ver_info . version_id_str ) ;
1418
1430
const versioned_info = latest_ver_info && await this . _get_version_info ( fs_context , versioned_path ) ;
1419
1431
1420
- gpfs_options = is_gpfs ?
1421
- await this . _open_files_gpfs ( fs_context , new_ver_tmp_path , latest_ver_path , upload_file ,
1422
- latest_ver_info , open_mode , undefined , versioned_info ) :
1423
- undefined ;
1432
+ gpfs_options = await this . _open_files_gpfs ( fs_context , new_ver_tmp_path , latest_ver_path , upload_file ,
1433
+ latest_ver_info , open_mode , undefined , versioned_info ) ;
1424
1434
const bucket_tmp_dir_path = this . get_bucket_tmpdir_full_path ( ) ;
1425
1435
dbg . log1 ( 'Namespace_fs._move_to_dest_version:' , latest_ver_info , new_ver_info , gpfs_options ) ;
1426
1436
@@ -1442,7 +1452,7 @@ class NamespaceFS {
1442
1452
dbg . log1 ( 'NamespaceFS._move_to_dest_version version ID of the latest version is a unique ID - the file will be moved it to .versions/ directory' ) ;
1443
1453
await native_fs_utils . _make_path_dirs ( versioned_path , fs_context ) ;
1444
1454
await native_fs_utils . safe_move ( fs_context , latest_ver_path , versioned_path , latest_ver_info ,
1445
- gpfs_options && gpfs_options . move_to_versions , bucket_tmp_dir_path ) ;
1455
+ gpfs_options ? .move_to_versions , bucket_tmp_dir_path ) ;
1446
1456
}
1447
1457
try {
1448
1458
// move new version to latest_ver_path (key path)
@@ -1457,9 +1467,10 @@ class NamespaceFS {
1457
1467
} catch ( err ) {
1458
1468
retries -= 1 ;
1459
1469
const should_retry = native_fs_utils . should_retry_link_unlink ( is_gpfs , err ) ;
1460
- dbg . warn ( `NamespaceFS._move_to_dest_version retrying retries=${ retries } should_retry=${ should_retry } ` +
1470
+ dbg . warn ( `NamespaceFS._move_to_dest_version error: retries=${ retries } should_retry=${ should_retry } ` +
1461
1471
` new_ver_tmp_path=${ new_ver_tmp_path } latest_ver_path=${ latest_ver_path } ` , err ) ;
1462
1472
if ( ! should_retry || retries <= 0 ) throw err ;
1473
+ await P . delay ( get_random_delay ( config . NSFS_RANDOM_DELAY_BASE , 0 , 50 ) ) ;
1463
1474
} finally {
1464
1475
if ( gpfs_options ) await this . _close_files_gpfs ( fs_context , gpfs_options . move_to_dst , open_mode ) ;
1465
1476
}
@@ -2680,9 +2691,18 @@ class NamespaceFS {
2680
2691
// if stat failed, undefined will return
2681
2692
}
2682
2693
2683
- // 1. if version exists in .versions/ folder - return its path
2684
- // 2. else if version is latest version - return latest version path
2685
- // 3. throw ENOENT error
2694
+ /**
2695
+ * _find_version_path returns the path of the version
2696
+ * 1. if version_id is not defined, it returns the key file
2697
+ * 2. else,
2698
+ * 2.1. check version format
2699
+ * 2.2. check if the latest version exists and it matches the version_id parameter the latest version path returns
2700
+ * 2.3. else, return the version path under .versions/
2701
+ * @param {import('./nb').NativeFSContext } fs_context
2702
+ * @param {{key: string, version_id?: string} } params
2703
+ * @param {boolean } [return_md_path]
2704
+ * @returns {Promise<string> }
2705
+ */
2686
2706
async _find_version_path ( fs_context , { key, version_id } , return_md_path ) {
2687
2707
const cur_ver_path = return_md_path ? this . _get_file_md_path ( { key } ) : this . _get_file_path ( { key } ) ;
2688
2708
if ( ! version_id ) return cur_ver_path ;
@@ -2726,6 +2746,11 @@ class NamespaceFS {
2726
2746
}
2727
2747
2728
2748
/**
2749
+ * _delete_single_object_versioned does the following -
2750
+ * if the deleted version is the latest - try to delete it from the latest version location
2751
+ * if the deleted version is in .versions/ - unlink the version
2752
+ * we call check_version_moved() in case of concurrent puts, the version might move to .versions/
2753
+ * if the version moved we will retry
2729
2754
* @param {nb.NativeFSContext } fs_context
2730
2755
* @param {string } key
2731
2756
* @param {string } version_id
@@ -2738,7 +2763,6 @@ class NamespaceFS {
2738
2763
* latest?: boolean;
2739
2764
* }>}
2740
2765
*/
2741
- // we can use this function when versioning is enabled or suspended
2742
2766
async _delete_single_object_versioned ( fs_context , key , version_id ) {
2743
2767
let retries = config . NSFS_RENAME_RETRIES ;
2744
2768
const is_gpfs = native_fs_utils . _is_gpfs ( fs_context ) ;
@@ -2754,19 +2778,19 @@ class NamespaceFS {
2754
2778
2755
2779
const deleted_latest = file_path === latest_version_path ;
2756
2780
if ( deleted_latest ) {
2757
- gpfs_options = is_gpfs ?
2758
- await this . _open_files_gpfs ( fs_context , file_path , undefined , undefined , undefined , undefined , true ) :
2759
- undefined ;
2781
+ gpfs_options = await this . _open_files_gpfs ( fs_context , file_path , undefined , undefined , undefined , undefined , true ) ;
2760
2782
const bucket_tmp_dir_path = this . get_bucket_tmpdir_full_path ( ) ;
2761
2783
await native_fs_utils . safe_unlink ( fs_context , file_path , version_info ,
2762
2784
gpfs_options ?. delete_version , bucket_tmp_dir_path ) ;
2785
+ await this . _check_version_moved ( fs_context , key , version_id ) ;
2763
2786
return { ...version_info , latest : true } ;
2764
2787
} else {
2765
2788
await native_fs_utils . unlink_ignore_enoent ( fs_context , file_path ) ;
2789
+ await this . _check_version_moved ( fs_context , key , version_id ) ;
2766
2790
}
2767
2791
return version_info ;
2768
2792
} catch ( err ) {
2769
- dbg . warn ( `NamespaceFS._delete_single_object_versioned: retrying retries=${ retries } file_path=${ file_path } ` , err ) ;
2793
+ dbg . warn ( `NamespaceFS._delete_single_object_versioned error: retries=${ retries } file_path=${ file_path } ` , err ) ;
2770
2794
retries -= 1 ;
2771
2795
// there are a few concurrency scenarios that might happen we should retry for -
2772
2796
// 1. the version id is the latest, concurrent put will might move the version id from being the latest to .versions/ -
@@ -2776,6 +2800,7 @@ class NamespaceFS {
2776
2800
// 3. concurrent delete of this version - will get ENOENT, doing a retry will return successfully
2777
2801
// after we will see that the version was already deleted
2778
2802
if ( retries <= 0 || ! native_fs_utils . should_retry_link_unlink ( is_gpfs , err ) ) throw err ;
2803
+ await P . delay ( get_random_delay ( config . NSFS_RANDOM_DELAY_BASE , 0 , 50 ) ) ;
2779
2804
} finally {
2780
2805
if ( gpfs_options ) await this . _close_files_gpfs ( fs_context , gpfs_options . delete_version , undefined , true ) ;
2781
2806
}
@@ -2878,14 +2903,15 @@ class NamespaceFS {
2878
2903
max_past_ver_info , bucket_tmp_dir_path ) ;
2879
2904
break ;
2880
2905
} catch ( err ) {
2906
+ dbg . warn ( `NamespaceFS: _promote_version_to_latest failed error: retries=${ retries } ` , err ) ;
2881
2907
retries -= 1 ;
2882
2908
if ( retries <= 0 ) throw err ;
2883
2909
if ( ! native_fs_utils . _is_gpfs ( fs_context ) && err . code === 'EEXIST' ) {
2884
2910
dbg . warn ( 'Namespace_fs._delete_version_id: latest version exist - skipping' ) ;
2885
2911
return ;
2886
2912
}
2887
2913
if ( err . code !== 'ENOENT' ) throw err ;
2888
- dbg . warn ( `NamespaceFS: _promote_version_to_latest failed retries= ${ retries } ` , err ) ;
2914
+ await P . delay ( get_random_delay ( config . NSFS_RANDOM_DELAY_BASE , 0 , 50 ) ) ;
2889
2915
}
2890
2916
}
2891
2917
}
@@ -2920,18 +2946,16 @@ class NamespaceFS {
2920
2946
2921
2947
dbg . log1 ( 'Namespace_fs._delete_latest_version:' , latest_ver_info , versioned_path , versioned_info ) ;
2922
2948
if ( latest_ver_info ) {
2923
- gpfs_options = is_gpfs ?
2924
- await this . _open_files_gpfs ( fs_context , latest_ver_path ,
2925
- undefined , undefined , undefined , undefined , true , versioned_info ) :
2926
- undefined ;
2949
+ gpfs_options = await this . _open_files_gpfs ( fs_context , latest_ver_path , undefined , undefined , undefined ,
2950
+ undefined , true , versioned_info ) ;
2927
2951
2928
2952
const suspended_and_latest_is_not_null = this . _is_versioning_suspended ( ) &&
2929
2953
latest_ver_info . version_id_str !== NULL_VERSION_ID ;
2930
2954
const bucket_tmp_dir_path = this . get_bucket_tmpdir_full_path ( ) ;
2931
2955
if ( this . _is_versioning_enabled ( ) || suspended_and_latest_is_not_null ) {
2932
2956
await native_fs_utils . _make_path_dirs ( versioned_path , fs_context ) ;
2933
2957
await native_fs_utils . safe_move ( fs_context , latest_ver_path , versioned_path , latest_ver_info ,
2934
- gpfs_options && gpfs_options . delete_version , bucket_tmp_dir_path ) ;
2958
+ gpfs_options ? .delete_version , bucket_tmp_dir_path ) ;
2935
2959
if ( suspended_and_latest_is_not_null ) {
2936
2960
// remove a version (or delete marker) with null version ID from .versions/ (if exists)
2937
2961
await this . _delete_null_version_from_versions_directory ( params . key , fs_context ) ;
@@ -2945,9 +2969,10 @@ class NamespaceFS {
2945
2969
}
2946
2970
break ;
2947
2971
} catch ( err ) {
2948
- dbg . warn ( `NamespaceFS._delete_latest_version: Retrying retries=${ retries } latest_ver_path=${ latest_ver_path } ` , err ) ;
2972
+ dbg . warn ( `NamespaceFS._delete_latest_version error: retries=${ retries } latest_ver_path=${ latest_ver_path } ` , err ) ;
2949
2973
retries -= 1 ;
2950
2974
if ( retries <= 0 || ! native_fs_utils . should_retry_link_unlink ( is_gpfs , err ) ) throw err ;
2975
+ await P . delay ( get_random_delay ( config . NSFS_RANDOM_DELAY_BASE , 0 , 50 ) ) ;
2951
2976
} finally {
2952
2977
if ( gpfs_options ) await this . _close_files_gpfs ( fs_context , gpfs_options . delete_version , undefined , true ) ;
2953
2978
}
@@ -2965,29 +2990,29 @@ class NamespaceFS {
2965
2990
// This function removes an object version or delete marker with a null version ID inside .version/ directory
2966
2991
async _delete_null_version_from_versions_directory ( key , fs_context ) {
2967
2992
const is_gpfs = native_fs_utils . _is_gpfs ( fs_context ) ;
2968
- let retries = config . NSFS_RENAME_RETRIES ;
2969
2993
const null_versioned_path = this . _get_version_path ( key , NULL_VERSION_ID ) ;
2970
2994
await this . _check_path_in_bucket_boundaries ( fs_context , null_versioned_path ) ;
2971
-
2995
+ let gpfs_options ;
2996
+ let retries = config . NSFS_RENAME_RETRIES ;
2972
2997
for ( ; ; ) {
2973
2998
try {
2974
2999
const null_versioned_path_info = await this . _get_version_info ( fs_context , null_versioned_path ) ;
2975
3000
dbg . log1 ( 'Namespace_fs._delete_null_version_from_versions_directory:' , null_versioned_path , null_versioned_path_info ) ;
2976
- if ( null_versioned_path_info ) {
2977
- const gpfs_options = is_gpfs ?
2978
- await this . _open_files_gpfs ( fs_context , null_versioned_path , undefined , undefined , undefined , undefined , true ) :
2979
- undefined ;
2980
- const bucket_tmp_dir_path = this . get_bucket_tmpdir_full_path ( ) ;
2981
- await native_fs_utils . safe_unlink ( fs_context , null_versioned_path , null_versioned_path_info ,
2982
- gpfs_options ?. delete_version , bucket_tmp_dir_path ) ;
3001
+ if ( ! null_versioned_path_info ) return ;
2983
3002
2984
- if ( gpfs_options ) await this . _close_files_gpfs ( fs_context , gpfs_options ?. delete_version , undefined , true ) ;
2985
- }
3003
+ gpfs_options = await this . _open_files_gpfs ( fs_context , null_versioned_path , undefined , undefined , undefined ,
3004
+ undefined , true ) ;
3005
+ const bucket_tmp_dir_path = this . get_bucket_tmpdir_full_path ( ) ;
3006
+ await native_fs_utils . safe_unlink ( fs_context , null_versioned_path , null_versioned_path_info ,
3007
+ gpfs_options ?. delete_version , bucket_tmp_dir_path ) ;
2986
3008
break ;
2987
3009
} catch ( err ) {
3010
+ dbg . warn ( `NamespaceFS._delete_null_version_from_versions_directory error: retries=${ retries } null_versioned_path=${ null_versioned_path } ` , err ) ;
2988
3011
retries -= 1 ;
2989
3012
if ( retries <= 0 || ! native_fs_utils . should_retry_link_unlink ( is_gpfs , err ) ) throw err ;
2990
- dbg . warn ( `NamespaceFS._delete_null_version_from_versions_directory Retrying retries=${ retries } null_versioned_path=${ null_versioned_path } ` , err ) ;
3013
+ await P . delay ( get_random_delay ( config . NSFS_RANDOM_DELAY_BASE , 0 , 50 ) ) ;
3014
+ } finally {
3015
+ if ( gpfs_options ) await this . _close_files_gpfs ( fs_context , gpfs_options . delete_version , undefined , true ) ;
2991
3016
}
2992
3017
}
2993
3018
}
@@ -3018,13 +3043,14 @@ class NamespaceFS {
3018
3043
await nb_native ( ) . fs . rename ( fs_context , upload_params . upload_path , file_path ) ;
3019
3044
return delete_marker_version_id ;
3020
3045
} catch ( err ) {
3046
+ dbg . warn ( `NamespaceFS: _create_delete_marker failed error: retries=${ retries } ` , err ) ;
3021
3047
retries -= 1 ;
3022
3048
if ( retries <= 0 ) throw err ;
3023
3049
if ( err . code === 'EEXIST' ) {
3024
3050
dbg . warn ( `NamespaceFS: _create_delete_marker already exists, success` , err ) ;
3025
3051
return delete_marker_version_id ;
3026
3052
}
3027
- dbg . warn ( `NamespaceFS: _create_delete_marker failed retries= ${ retries } ` , err ) ;
3053
+ await P . delay ( get_random_delay ( config . NSFS_RANDOM_DELAY_BASE , 0 , 50 ) ) ;
3028
3054
} finally {
3029
3055
if ( upload_params ) await this . complete_object_upload_finally ( undefined , undefined , upload_params . target_file , fs_context ) ;
3030
3056
}
@@ -3070,6 +3096,8 @@ class NamespaceFS {
3070
3096
// eslint-disable-next-line max-params
3071
3097
async _open_files_gpfs ( fs_context , src_path , dst_path , upload_or_dir_file , dst_ver_info , open_mode , delete_version , versioned_info ) {
3072
3098
dbg . log1 ( 'Namespace_fs._open_files_gpfs:' , src_path , src_path && path . dirname ( src_path ) , dst_path , upload_or_dir_file , dst_ver_info , open_mode , delete_version , versioned_info ) ;
3099
+ const is_gpfs = native_fs_utils . _is_gpfs ( fs_context ) ;
3100
+ if ( ! is_gpfs ) return ;
3073
3101
3074
3102
let src_file ;
3075
3103
let dst_file ;
@@ -3133,6 +3161,22 @@ class NamespaceFS {
3133
3161
}
3134
3162
}
3135
3163
3164
+ /**
3165
+ * _check_version_moved recieves key and version_id and checks if the version still exists in one of the optional locations
3166
+ * latest version location or .versions/ directory
3167
+ * @param {import('./nb').NativeFSContext } fs_context
3168
+ * @param {string } key
3169
+ * @param {string } version_id
3170
+ */
3171
+ async _check_version_moved ( fs_context , key , version_id ) {
3172
+ const latest_version_path = this . _get_file_path ( { key } ) ;
3173
+ const versioned_path = this . _get_version_path ( key , version_id ) ;
3174
+ const versioned_path_info = await this . _get_version_info ( fs_context , versioned_path ) ;
3175
+ if ( versioned_path_info ) throw error_utils . new_error_code ( 'VERSION_MOVED' , `version file moved from latest ${ latest_version_path } to .versions/ ${ versioned_path } , retrying` ) ;
3176
+ const latest_ver_info = await this . _get_version_info ( fs_context , latest_version_path ) ;
3177
+ if ( latest_ver_info && latest_ver_info . version_id_str === version_id ) throw error_utils . new_error_code ( 'VERSION_MOVED' , `version file moved from .versions/ ${ versioned_path } to latest ${ latest_version_path } , retrying` ) ;
3178
+ }
3179
+
3136
3180
async _throw_if_storage_class_not_supported ( storage_class ) {
3137
3181
if ( ! await this . _is_storage_class_supported ( storage_class ) ) {
3138
3182
throw new S3Error ( S3Error . InvalidStorageClass ) ;
0 commit comments