Skip to content

Commit aa1d857

Browse files
committed
Warp concurrent directories creation/deletion
Signed-off-by: Romy <35330373+romayalon@users.noreply.github.com> add concurrency tests Signed-off-by: Romy <35330373+romayalon@users.noreply.github.com>
1 parent 4ece070 commit aa1d857

File tree

4 files changed

+97
-10
lines changed

4 files changed

+97
-10
lines changed

config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,7 @@ config.NSFS_BUF_POOL_WARNING_TIMEOUT = 2 * 60 * 1000;
765765
config.NSFS_SEM_WARNING_TIMEOUT = 10 * 60 * 1000;
766766
// number of rename retries in case of deleted destination directory
767767
config.NSFS_RENAME_RETRIES = 10;
768+
config.NSFS_MKDIR_PATH_RETRIES = 3;
768769

769770
config.NSFS_VERSIONING_ENABLED = true;
770771
config.NSFS_UPDATE_ISSUES_REPORT_ENABLED = true;

src/sdk/namespace_fs.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,7 +1291,7 @@ class NamespaceFS {
12911291
const same_inode = params.copy_source && copy_res === copy_status_enum.SAME_INODE;
12921292
const is_dir_content = this._is_directory_content(file_path, params.key);
12931293

1294-
let stat = await target_file.stat(fs_context);
1294+
const stat = await target_file.stat(fs_context);
12951295
this._verify_encryption(params.encryption, this._get_encryption_info(stat));
12961296

12971297
// handle xattr
@@ -1335,15 +1335,14 @@ class NamespaceFS {
13351335

13361336
if (!same_inode && !part_upload) {
13371337
await this._move_to_dest(fs_context, upload_path, file_path, target_file, open_mode, params.key);
1338-
if (config.NSFS_TRIGGER_FSYNC) await nb_native().fs.fsync(fs_context, path.dirname(file_path));
13391338
}
13401339

13411340
// when object is a dir, xattr are set on the folder itself and the content is in .folder file
13421341
if (is_dir_content) {
13431342
if (params.copy_source) fs_xattr = await this._get_copy_source_xattr(params, fs_context, fs_xattr);
13441343
await this._assign_dir_content_to_xattr(fs_context, fs_xattr, { ...params, size: stat.size }, copy_xattr);
13451344
}
1346-
stat = await nb_native().fs.stat(fs_context, file_path);
1345+
stat.xattr = { ...stat.xattr, ...fs_xattr };
13471346
const upload_info = this._get_upload_info(stat, fs_xattr && fs_xattr[XATTR_VERSION_ID]);
13481347
return upload_info;
13491348
}
@@ -1396,6 +1395,7 @@ class NamespaceFS {
13961395
} else {
13971396
await this._move_to_dest_version(fs_context, source_path, dest_path, target_file, key, open_mode);
13981397
}
1398+
if (config.NSFS_TRIGGER_FSYNC) await nb_native().fs.fsync(fs_context, path.dirname(dest_path));
13991399
break;
14001400
} catch (err) {
14011401
retries -= 1;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/* Copyright (C) 2016 NooBaa */
2+
'use strict';
3+
4+
const path = require('path');
5+
const P = require('../../../util/promise');
6+
const fs_utils = require('../../../util/fs_utils');
7+
const NamespaceFS = require('../../../sdk/namespace_fs');
8+
const buffer_utils = require('../../../util/buffer_utils');
9+
const { TMP_PATH } = require('../../system_tests/test_utils');
10+
const { crypto_random_string } = require('../../../util/string_utils');
11+
const endpoint_stats_collector = require('../../../sdk/endpoint_stats_collector');
12+
13+
function make_dummy_object_sdk(nsfs_config, uid, gid) {
14+
return {
15+
requesting_account: {
16+
nsfs_account_config: nsfs_config && {
17+
uid: uid || process.getuid(),
18+
gid: gid || process.getgid(),
19+
backend: '',
20+
}
21+
},
22+
abort_controller: new AbortController(),
23+
throw_if_aborted() {
24+
if (this.abort_controller.signal.aborted) throw new Error('request aborted signal');
25+
}
26+
};
27+
}
28+
29+
const DUMMY_OBJECT_SDK = make_dummy_object_sdk(true);
30+
describe('test nsfs concurrency', () => {
31+
const tmp_fs_path = path.join(TMP_PATH, 'test_nsfs_concurrency');
32+
33+
const nsfs = new NamespaceFS({
34+
bucket_path: tmp_fs_path,
35+
bucket_id: '1',
36+
namespace_resource_id: undefined,
37+
access_mode: undefined,
38+
versioning: 'DISABLED',
39+
force_md5_etag: false,
40+
stats: endpoint_stats_collector.instance(),
41+
});
42+
43+
beforeEach(async () => {
44+
await fs_utils.create_fresh_path(tmp_fs_path);
45+
});
46+
47+
afterEach(async () => {
48+
await fs_utils.folder_delete(tmp_fs_path);
49+
});
50+
51+
it('multiple puts of the same nested key', async () => {
52+
const bucket = 'bucket1';
53+
const key = 'dir1/key1';
54+
const res_etags = [];
55+
for (let i = 0; i < 15; i++) {
56+
const random_data = Buffer.from(String(crypto_random_string(7)));
57+
const body = buffer_utils.buffer_to_read_stream(random_data);
58+
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
59+
.catch(err => {
60+
console.log('put the same key error - ', err);
61+
throw err;
62+
}).then(res => {
63+
console.log('upload res', res);
64+
res_etags.push(res.etag);
65+
});
66+
await nsfs.delete_object({ bucket: bucket, key: key }, DUMMY_OBJECT_SDK).catch(err => console.log('delete the same key error - ', err));
67+
68+
}
69+
await P.delay(5000);
70+
expect(res_etags).toHaveLength(15);
71+
}, 6000);
72+
});

src/util/native_fs_utils.js

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,29 @@ async function _generate_unique_path(fs_context, tmp_dir_path) {
6868
// opens open_path on POSIX, and on GPFS it will open open_path parent folder
6969
async function open_file(fs_context, bucket_path, open_path, open_mode = config.NSFS_OPEN_READ_MODE,
7070
file_permissions = config.BASE_MODE_FILE) {
71+
let retries = config.NSFS_MKDIR_PATH_RETRIES;
72+
7173
const dir_path = path.dirname(open_path);
72-
if ((open_mode === 'wt' || open_mode === 'w') && dir_path !== bucket_path) {
73-
dbg.log1(`NamespaceFS._open_file: mode=${open_mode} creating dirs`, open_path, bucket_path);
74-
await _make_path_dirs(open_path, fs_context);
75-
}
76-
dbg.log1(`NamespaceFS._open_file: mode=${open_mode}`, open_path);
77-
// for 'wt' open the tmpfile with the parent dir path
7874
const actual_open_path = open_mode === 'wt' ? dir_path : open_path;
79-
return nb_native().fs.open(fs_context, actual_open_path, open_mode, get_umasked_mode(file_permissions));
75+
const should_create_path_dirs = (open_mode === 'wt' || open_mode === 'w') && dir_path !== bucket_path;
76+
for (;;) {
77+
try {
78+
if (should_create_path_dirs) {
79+
dbg.log1(`NamespaceFS._open_file: mode=${open_mode} creating dirs`, open_path, bucket_path);
80+
await _make_path_dirs(open_path, fs_context);
81+
}
82+
dbg.log1(`NamespaceFS._open_file: mode=${open_mode}`, open_path);
83+
// for 'wt' open the tmpfile with the parent dir path
84+
const fd = await nb_native().fs.open(fs_context, actual_open_path, open_mode, get_umasked_mode(file_permissions));
85+
return fd;
86+
} catch (err) {
87+
dbg.warn(`native_fs_utils.open_file Retrying retries=${retries} mode=${open_mode} open_path=${open_path} dir_path=${dir_path} actual_open_path=${actual_open_path}`, err);
88+
if (err.code !== 'ENOENT') throw err;
89+
// case of concurrennt deletion of the dir_path
90+
if (retries <= 0 || !should_create_path_dirs) throw err;
91+
retries -= 1;
92+
}
93+
}
8094
}
8195

8296
/**

0 commit comments

Comments
 (0)