Skip to content

Commit bf7d5be

Browse files
committed
NC | Concurrency & refactoring | Add delay, version move checks and GPFS refactoring
Signed-off-by: Romy <35330373+romayalon@users.noreply.github.com>
1 parent bb4a8b3 commit bf7d5be

File tree

4 files changed

+85
-44
lines changed

4 files changed

+85
-44
lines changed

config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,7 @@ config.NSFS_SEM_WARNING_TIMEOUT = 10 * 60 * 1000;
766766
// number of rename retries in case of deleted destination directory
767767
config.NSFS_RENAME_RETRIES = 10;
768768
config.NSFS_MKDIR_PATH_RETRIES = 3;
769+
config.NSFS_RANDOM_DELAY_BASE = 70;
769770

770771
config.NSFS_VERSIONING_ENABLED = true;
771772
config.NSFS_UPDATE_ISSUES_REPORT_ENABLED = true;

src/sdk/namespace_fs.js

Lines changed: 80 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const { v4: uuidv4 } = require('uuid');
1212
const P = require('../util/promise');
1313
const dbg = require('../util/debug_module')(__filename);
1414
const config = require('../../config');
15+
const crypto = require('crypto');
1516
const s3_utils = require('../endpoint/s3/s3_utils');
1617
const error_utils = require('../util/error_utils');
1718
const stream_utils = require('../util/stream_utils');
@@ -293,6 +294,16 @@ function to_fs_xattr(xattr) {
293294
return _.mapKeys(xattr, (val, key) => XATTR_USER_PREFIX + key);
294295
}
295296

297+
/**
298+
* get_random_delay returns a random delay number between base + min and max
299+
* @param {number} base
300+
* @param {number} min
301+
* @param {number} max
302+
* @returns {number}
303+
*/
304+
function get_random_delay(base, min, max) {
305+
return base + crypto.randomInt(min, max);
306+
}
296307

297308
/**
298309
* @typedef {{
@@ -1391,9 +1402,10 @@ class NamespaceFS {
13911402
}
13921403

13931404
// 1. get latest version_id
1394-
// 2. if versioning is suspended -
1405+
// 2. if versioning is suspended -
13951406
// 2.1 if version ID of the latest version is null -
1396-
// 2.1.1 remove latest version
1407+
// 2.1.1. if it's POSIX backend - unlink the null version
1408+
// 2.1.2. if it's GPFS backend - nothing to do, the linkatif will override it
13971409
// 2.2 else (version ID of the latest version is unique or there is no latest version) -
13981410
// 2.2.1 remove a version (or delete marker) with null version ID from .versions/ (if exists)
13991411
// 3. if latest version exists -
@@ -1417,10 +1429,8 @@ class NamespaceFS {
14171429
const versioned_path = latest_ver_info && this._get_version_path(key, latest_ver_info.version_id_str);
14181430
const versioned_info = latest_ver_info && await this._get_version_info(fs_context, versioned_path);
14191431

1420-
gpfs_options = is_gpfs ?
1421-
await this._open_files_gpfs(fs_context, new_ver_tmp_path, latest_ver_path, upload_file,
1422-
latest_ver_info, open_mode, undefined, versioned_info) :
1423-
undefined;
1432+
gpfs_options = await this._open_files_gpfs(fs_context, new_ver_tmp_path, latest_ver_path, upload_file,
1433+
latest_ver_info, open_mode, undefined, versioned_info);
14241434
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
14251435
dbg.log1('Namespace_fs._move_to_dest_version:', latest_ver_info, new_ver_info, gpfs_options);
14261436

@@ -1442,7 +1452,7 @@ class NamespaceFS {
14421452
dbg.log1('NamespaceFS._move_to_dest_version version ID of the latest version is a unique ID - the file will be moved it to .versions/ directory');
14431453
await native_fs_utils._make_path_dirs(versioned_path, fs_context);
14441454
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
1445-
gpfs_options && gpfs_options.move_to_versions, bucket_tmp_dir_path);
1455+
gpfs_options?.move_to_versions, bucket_tmp_dir_path);
14461456
}
14471457
try {
14481458
// move new version to latest_ver_path (key path)
@@ -1457,9 +1467,10 @@ class NamespaceFS {
14571467
} catch (err) {
14581468
retries -= 1;
14591469
const should_retry = native_fs_utils.should_retry_link_unlink(is_gpfs, err);
1460-
dbg.warn(`NamespaceFS._move_to_dest_version retrying retries=${retries} should_retry=${should_retry}` +
1470+
dbg.warn(`NamespaceFS._move_to_dest_version error: retries=${retries} should_retry=${should_retry}` +
14611471
` new_ver_tmp_path=${new_ver_tmp_path} latest_ver_path=${latest_ver_path}`, err);
14621472
if (!should_retry || retries <= 0) throw err;
1473+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
14631474
} finally {
14641475
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.move_to_dst, open_mode);
14651476
}
@@ -2680,9 +2691,18 @@ class NamespaceFS {
26802691
// if stat failed, undefined will return
26812692
}
26822693

2683-
// 1. if version exists in .versions/ folder - return its path
2684-
// 2. else if version is latest version - return latest version path
2685-
// 3. throw ENOENT error
2694+
/**
2695+
* _find_version_path returns the path of the version
2696+
* 1. if version_id is not defined, it returns the key file
2697+
* 2. else,
2698+
* 2.1. check version format
2699+
* 2.2. check if the latest version exists and it matches the version_id parameter the latest version path returns
2700+
* 2.3. else, return the version path under .versions/
2701+
* @param {import('./nb').NativeFSContext} fs_context
2702+
* @param {{key: string, version_id?: string}} params
2703+
* @param {boolean} [return_md_path]
2704+
* @returns {Promise<string>}
2705+
*/
26862706
async _find_version_path(fs_context, { key, version_id }, return_md_path) {
26872707
const cur_ver_path = return_md_path ? this._get_file_md_path({ key }) : this._get_file_path({ key });
26882708
if (!version_id) return cur_ver_path;
@@ -2726,6 +2746,11 @@ class NamespaceFS {
27262746
}
27272747

27282748
/**
2749+
* _delete_single_object_versioned does the following -
2750+
* if the deleted version is the latest - try to delete it from the latest version location
2751+
* if the deleted version is in .versions/ - unlink the version
2752+
* we call check_version_moved() in case of concurrent puts, the version might move to .versions/
2753+
* if the version moved we will retry
27292754
* @param {nb.NativeFSContext} fs_context
27302755
* @param {string} key
27312756
* @param {string} version_id
@@ -2738,7 +2763,6 @@ class NamespaceFS {
27382763
* latest?: boolean;
27392764
* }>}
27402765
*/
2741-
// we can use this function when versioning is enabled or suspended
27422766
async _delete_single_object_versioned(fs_context, key, version_id) {
27432767
let retries = config.NSFS_RENAME_RETRIES;
27442768
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
@@ -2754,19 +2778,19 @@ class NamespaceFS {
27542778

27552779
const deleted_latest = file_path === latest_version_path;
27562780
if (deleted_latest) {
2757-
gpfs_options = is_gpfs ?
2758-
await this._open_files_gpfs(fs_context, file_path, undefined, undefined, undefined, undefined, true) :
2759-
undefined;
2781+
gpfs_options = await this._open_files_gpfs(fs_context, file_path, undefined, undefined, undefined, undefined, true);
27602782
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
27612783
await native_fs_utils.safe_unlink(fs_context, file_path, version_info,
27622784
gpfs_options?.delete_version, bucket_tmp_dir_path);
2785+
await this._check_version_moved(fs_context, key, version_id);
27632786
return { ...version_info, latest: true };
27642787
} else {
27652788
await native_fs_utils.unlink_ignore_enoent(fs_context, file_path);
2789+
await this._check_version_moved(fs_context, key, version_id);
27662790
}
27672791
return version_info;
27682792
} catch (err) {
2769-
dbg.warn(`NamespaceFS._delete_single_object_versioned: retrying retries=${retries} file_path=${file_path}`, err);
2793+
dbg.warn(`NamespaceFS._delete_single_object_versioned error: retries=${retries} file_path=${file_path}`, err);
27702794
retries -= 1;
27712795
// there are a few concurrency scenarios that might happen we should retry for -
27722796
// 1. the version id is the latest, concurrent put will might move the version id from being the latest to .versions/ -
@@ -2776,6 +2800,7 @@ class NamespaceFS {
27762800
// 3. concurrent delete of this version - will get ENOENT, doing a retry will return successfully
27772801
// after we will see that the version was already deleted
27782802
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
2803+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
27792804
} finally {
27802805
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
27812806
}
@@ -2878,14 +2903,15 @@ class NamespaceFS {
28782903
max_past_ver_info, bucket_tmp_dir_path);
28792904
break;
28802905
} catch (err) {
2906+
dbg.warn(`NamespaceFS: _promote_version_to_latest failed error: retries=${retries}`, err);
28812907
retries -= 1;
28822908
if (retries <= 0) throw err;
28832909
if (!native_fs_utils._is_gpfs(fs_context) && err.code === 'EEXIST') {
28842910
dbg.warn('Namespace_fs._delete_version_id: latest version exist - skipping');
28852911
return;
28862912
}
28872913
if (err.code !== 'ENOENT') throw err;
2888-
dbg.warn(`NamespaceFS: _promote_version_to_latest failed retries=${retries}`, err);
2914+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
28892915
}
28902916
}
28912917
}
@@ -2920,18 +2946,16 @@ class NamespaceFS {
29202946

29212947
dbg.log1('Namespace_fs._delete_latest_version:', latest_ver_info, versioned_path, versioned_info);
29222948
if (latest_ver_info) {
2923-
gpfs_options = is_gpfs ?
2924-
await this._open_files_gpfs(fs_context, latest_ver_path,
2925-
undefined, undefined, undefined, undefined, true, versioned_info) :
2926-
undefined;
2949+
gpfs_options = await this._open_files_gpfs(fs_context, latest_ver_path, undefined, undefined, undefined,
2950+
undefined, true, versioned_info);
29272951

29282952
const suspended_and_latest_is_not_null = this._is_versioning_suspended() &&
29292953
latest_ver_info.version_id_str !== NULL_VERSION_ID;
29302954
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
29312955
if (this._is_versioning_enabled() || suspended_and_latest_is_not_null) {
29322956
await native_fs_utils._make_path_dirs(versioned_path, fs_context);
29332957
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
2934-
gpfs_options && gpfs_options.delete_version, bucket_tmp_dir_path);
2958+
gpfs_options?.delete_version, bucket_tmp_dir_path);
29352959
if (suspended_and_latest_is_not_null) {
29362960
// remove a version (or delete marker) with null version ID from .versions/ (if exists)
29372961
await this._delete_null_version_from_versions_directory(params.key, fs_context);
@@ -2945,9 +2969,10 @@ class NamespaceFS {
29452969
}
29462970
break;
29472971
} catch (err) {
2948-
dbg.warn(`NamespaceFS._delete_latest_version: Retrying retries=${retries} latest_ver_path=${latest_ver_path}`, err);
2972+
dbg.warn(`NamespaceFS._delete_latest_version error: retries=${retries} latest_ver_path=${latest_ver_path}`, err);
29492973
retries -= 1;
29502974
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
2975+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
29512976
} finally {
29522977
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
29532978
}
@@ -2965,29 +2990,29 @@ class NamespaceFS {
29652990
// This function removes an object version or delete marker with a null version ID inside .version/ directory
29662991
async _delete_null_version_from_versions_directory(key, fs_context) {
29672992
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
2968-
let retries = config.NSFS_RENAME_RETRIES;
29692993
const null_versioned_path = this._get_version_path(key, NULL_VERSION_ID);
29702994
await this._check_path_in_bucket_boundaries(fs_context, null_versioned_path);
2971-
2995+
let gpfs_options;
2996+
let retries = config.NSFS_RENAME_RETRIES;
29722997
for (;;) {
29732998
try {
29742999
const null_versioned_path_info = await this._get_version_info(fs_context, null_versioned_path);
29753000
dbg.log1('Namespace_fs._delete_null_version_from_versions_directory:', null_versioned_path, null_versioned_path_info);
2976-
if (null_versioned_path_info) {
2977-
const gpfs_options = is_gpfs ?
2978-
await this._open_files_gpfs(fs_context, null_versioned_path, undefined, undefined, undefined, undefined, true) :
2979-
undefined;
2980-
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
2981-
await native_fs_utils.safe_unlink(fs_context, null_versioned_path, null_versioned_path_info,
2982-
gpfs_options?.delete_version, bucket_tmp_dir_path);
3001+
if (!null_versioned_path_info) return;
29833002

2984-
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options?.delete_version, undefined, true);
2985-
}
3003+
gpfs_options = await this._open_files_gpfs(fs_context, null_versioned_path, undefined, undefined, undefined,
3004+
undefined, true);
3005+
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
3006+
await native_fs_utils.safe_unlink(fs_context, null_versioned_path, null_versioned_path_info,
3007+
gpfs_options?.delete_version, bucket_tmp_dir_path);
29863008
break;
29873009
} catch (err) {
3010+
dbg.warn(`NamespaceFS._delete_null_version_from_versions_directory error: retries=${retries} null_versioned_path=${null_versioned_path}`, err);
29883011
retries -= 1;
29893012
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
2990-
dbg.warn(`NamespaceFS._delete_null_version_from_versions_directory Retrying retries=${retries} null_versioned_path=${null_versioned_path}`, err);
3013+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
3014+
} finally {
3015+
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
29913016
}
29923017
}
29933018
}
@@ -3018,13 +3043,14 @@ class NamespaceFS {
30183043
await nb_native().fs.rename(fs_context, upload_params.upload_path, file_path);
30193044
return delete_marker_version_id;
30203045
} catch (err) {
3046+
dbg.warn(`NamespaceFS: _create_delete_marker failed error: retries=${retries}`, err);
30213047
retries -= 1;
30223048
if (retries <= 0) throw err;
30233049
if (err.code === 'EEXIST') {
30243050
dbg.warn(`NamespaceFS: _create_delete_marker already exists, success`, err);
30253051
return delete_marker_version_id;
30263052
}
3027-
dbg.warn(`NamespaceFS: _create_delete_marker failed retries=${retries}`, err);
3053+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
30283054
} finally {
30293055
if (upload_params) await this.complete_object_upload_finally(undefined, undefined, upload_params.target_file, fs_context);
30303056
}
@@ -3070,6 +3096,8 @@ class NamespaceFS {
30703096
// eslint-disable-next-line max-params
30713097
async _open_files_gpfs(fs_context, src_path, dst_path, upload_or_dir_file, dst_ver_info, open_mode, delete_version, versioned_info) {
30723098
dbg.log1('Namespace_fs._open_files_gpfs:', src_path, src_path && path.dirname(src_path), dst_path, upload_or_dir_file, dst_ver_info, open_mode, delete_version, versioned_info);
3099+
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
3100+
if (!is_gpfs) return;
30733101

30743102
let src_file;
30753103
let dst_file;
@@ -3133,6 +3161,22 @@ class NamespaceFS {
31333161
}
31343162
}
31353163

3164+
/**
3165+
* _check_version_moved recieves key and version_id and checks if the version still exists in one of the optional locations
3166+
* latest version location or .versions/ directory
3167+
* @param {import('./nb').NativeFSContext} fs_context
3168+
* @param {string} key
3169+
* @param {string} version_id
3170+
*/
3171+
async _check_version_moved(fs_context, key, version_id) {
3172+
const latest_version_path = this._get_file_path({ key });
3173+
const versioned_path = this._get_version_path(key, version_id);
3174+
const versioned_path_info = await this._get_version_info(fs_context, versioned_path);
3175+
if (versioned_path_info) throw error_utils.new_error_code('VERSION_MOVED', `version file moved from latest ${latest_version_path} to .versions/ ${versioned_path}, retrying`);
3176+
const latest_ver_info = await this._get_version_info(fs_context, latest_version_path);
3177+
if (latest_ver_info && latest_ver_info.version_id_str === version_id) throw error_utils.new_error_code('VERSION_MOVED', `version file moved from .versions/ ${versioned_path} to latest ${latest_version_path}, retrying`);
3178+
}
3179+
31363180
async _throw_if_storage_class_not_supported(storage_class) {
31373181
if (!await this._is_storage_class_supported(storage_class)) {
31383182
throw new S3Error(S3Error.InvalidStorageClass);

src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
'use strict';
44

55
const path = require('path');
6-
const config = require('../../../../config');
76
const P = require('../../../util/promise');
87
const fs_utils = require('../../../util/fs_utils');
98
const NamespaceFS = require('../../../sdk/namespace_fs');
@@ -42,23 +41,21 @@ const nsfs = new NamespaceFS({
4241

4342
const DUMMY_OBJECT_SDK = make_dummy_object_sdk(true);
4443
describe('test versioning concurrency', () => {
45-
const prior_value_of_nsfs_rename_retries = config.NSFS_RENAME_RETRIES;
4644

4745
beforeEach(async () => {
4846
await fs_utils.create_fresh_path(tmp_fs_path);
4947
});
5048

5149
afterEach(async () => {
5250
await fs_utils.folder_delete(tmp_fs_path);
53-
config.NSFS_RENAME_RETRIES = prior_value_of_nsfs_rename_retries;
5451
});
5552

5653
it('multiple puts of the same key', async () => {
5754
const bucket = 'bucket1';
5855
const key = 'key1';
5956
const failed_operations = [];
6057
const successful_operations = [];
61-
const num_of_concurrency = 5;
58+
const num_of_concurrency = 10;
6259
for (let i = 0; i < num_of_concurrency; i++) {
6360
const random_data = Buffer.from(String(i));
6461
const body = buffer_utils.buffer_to_read_stream(random_data);
@@ -169,7 +166,6 @@ describe('test versioning concurrency', () => {
169166
const failed_put_operations = [];
170167
const failed_head_operations = [];
171168
const number_of_iterations = 10;
172-
config.NSFS_RENAME_RETRIES = 40;
173169
for (let i = 0; i < number_of_iterations; i++) {
174170
const random_data = Buffer.from(String(i));
175171
const body = buffer_utils.buffer_to_read_stream(random_data);
@@ -187,6 +183,7 @@ describe('test versioning concurrency', () => {
187183
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
188184
expect(versions.objects.length).toBe(number_of_iterations + 1); // 1 version before + 10 versions concurrent
189185
});
186+
190187
it('concurrent puts & delete latest objects', async () => {
191188
const bucket = 'bucket1';
192189
const key = 'key3';
@@ -255,8 +252,7 @@ describe('test versioning concurrency', () => {
255252
expect(num_of_latest_versions).toBe(1);
256253
}, 6000);
257254

258-
// currently being skipped because it's not passing - probably a bug that we need to fix
259-
it.skip('concurrent delete objects by version id/latest', async () => {
255+
it('concurrent delete objects by version id/latest', async () => {
260256
const bucket = 'bucket1';
261257
const key = 'key5';
262258
const delete_ver_res_arr = [];

0 commit comments

Comments
 (0)