Skip to content

Commit 80c3f7a

Browse files
authored
Merge pull request #8394 from shirady/nsfs-nc-versioning-concurrency-tests
NC | NSFS | Add Concurrency Tests
2 parents 8d71b8f + 10d6fc7 commit 80c3f7a

File tree

1 file changed

+97
-21
lines changed

1 file changed

+97
-21
lines changed

src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js

Lines changed: 97 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
'use strict';
33

44
const path = require('path');
5+
const config = require('../../../../config');
56
const P = require('../../../util/promise');
67
const fs_utils = require('../../../util/fs_utils');
78
const NamespaceFS = require('../../../sdk/namespace_fs');
@@ -26,60 +27,63 @@ function make_dummy_object_sdk(nsfs_config, uid, gid) {
2627
};
2728
}
2829

30+
const tmp_fs_path = path.join(TMP_PATH, 'test_versioning_concurrency');
31+
32+
const nsfs = new NamespaceFS({
33+
bucket_path: tmp_fs_path,
34+
bucket_id: '1',
35+
namespace_resource_id: undefined,
36+
access_mode: undefined,
37+
versioning: 'ENABLED',
38+
force_md5_etag: false,
39+
stats: endpoint_stats_collector.instance(),
40+
});
41+
2942
const DUMMY_OBJECT_SDK = make_dummy_object_sdk(true);
3043
describe('test versioning concurrency', () => {
31-
const tmp_fs_path = path.join(TMP_PATH, 'test_versioning_concurrency');
32-
33-
const nsfs = new NamespaceFS({
34-
bucket_path: tmp_fs_path,
35-
bucket_id: '1',
36-
namespace_resource_id: undefined,
37-
access_mode: undefined,
38-
versioning: 'ENABLED',
39-
force_md5_etag: false,
40-
stats: endpoint_stats_collector.instance(),
41-
});
44+
const prior_value_of_nsfs_rename_retries = config.NSFS_RENAME_RETRIES;
4245

4346
beforeEach(async () => {
4447
await fs_utils.create_fresh_path(tmp_fs_path);
4548
});
4649

4750
afterEach(async () => {
4851
await fs_utils.folder_delete(tmp_fs_path);
52+
config.NSFS_RENAME_RETRIES = prior_value_of_nsfs_rename_retries;
4953
});
5054

5155
it('multiple puts of the same key', async () => {
5256
const bucket = 'bucket1';
5357
const key = 'key1';
58+
const failed_operations = [];
5459
for (let i = 0; i < 5; i++) {
5560
const random_data = Buffer.from(String(i));
5661
const body = buffer_utils.buffer_to_read_stream(random_data);
57-
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK).catch(err => console.log('multiple puts of the same key error - ', err));
62+
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
63+
.catch(err => failed_operations.push(err));
5864
}
5965
await P.delay(1000);
66+
expect(failed_operations.length).toBe(0);
6067
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
6168
expect(versions.objects.length).toBe(5);
6269
});
6370

6471
it('multiple delete version id and key', async () => {
6572
const bucket = 'bucket1';
6673
const key = 'key2';
67-
const versions_arr = [];
68-
// upload 5 versions of key2
69-
for (let i = 0; i < 5; i++) {
70-
const random_data = Buffer.from(String(i));
71-
const body = buffer_utils.buffer_to_read_stream(random_data);
72-
const res = await nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK).catch(err => console.log('put error - ', err));
73-
versions_arr.push(res.etag);
74-
}
74+
const number_of_versions = 5;
75+
const versions_arr = await _upload_versions(bucket, key, number_of_versions);
76+
7577
const mid_version_id = versions_arr[3];
7678
const number_of_successful_operations = [];
79+
const failed_operations = [];
7780
for (let i = 0; i < 15; i++) {
7881
nsfs.delete_object({ bucket: bucket, key: key, version_id: mid_version_id }, DUMMY_OBJECT_SDK)
7982
.then(res => number_of_successful_operations.push(res))
80-
.catch(err => console.log('delete the same key & version id error - ', err));
83+
.catch(err => failed_operations.push(err));
8184
}
8285
await P.delay(1000);
86+
expect(failed_operations.length).toBe(0);
8387
expect(number_of_successful_operations.length).toBe(15);
8488
});
8589

@@ -124,4 +128,76 @@ describe('test versioning concurrency', () => {
124128
const list_res = await nsfs.list_objects({ bucket: bucket }, DUMMY_OBJECT_SDK);
125129
expect(list_res.objects.length).toBe(0);
126130
}, 8000);
131+
132+
it('concurrent delete of latest version', async () => {
133+
const bucket = 'bucket1';
134+
const key = 'key3';
135+
const number_of_versions = 5;
136+
const versions_arr = await _upload_versions(bucket, key, number_of_versions);
137+
expect(versions_arr.length).toBe(number_of_versions);
138+
139+
const successful_operations = [];
140+
const failed_operations = [];
141+
for (let i = 0; i < 3; i++) {
142+
nsfs.delete_object({ bucket: bucket, key: key }, DUMMY_OBJECT_SDK)
143+
.then(res => successful_operations.push(res))
144+
.catch(err => failed_operations.push(err));
145+
}
146+
147+
await P.delay(1000);
148+
expect(failed_operations.length).toBe(0);
149+
expect(successful_operations.length).toBe(3);
150+
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
151+
expect(versions.objects.length).toBe(8); // 5 versions before + 3 delete markers concurrent
152+
const delete_marker_arr = versions.objects.filter(object => object.delete_marker === true);
153+
expect(delete_marker_arr.length).toBe(3);
154+
});
155+
156+
it('concurrent put object and head object latest version', async () => {
157+
const bucket = 'bucket1';
158+
const key = 'key4';
159+
await _upload_versions(bucket, key, 1);
160+
161+
const successful_put_operations = [];
162+
const successful_head_operations = [];
163+
const failed_put_operations = [];
164+
const failed_head_operations = [];
165+
const number_of_iterations = 10;
166+
config.NSFS_RENAME_RETRIES = 40;
167+
for (let i = 0; i < number_of_iterations; i++) {
168+
const random_data = Buffer.from(String(i));
169+
const body = buffer_utils.buffer_to_read_stream(random_data);
170+
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
171+
.then(res => successful_put_operations.push(res))
172+
.catch(err => failed_put_operations.push(err));
173+
nsfs.read_object_md({ bucket: bucket, key: key }, DUMMY_OBJECT_SDK)
174+
.then(res => successful_head_operations.push(res))
175+
.catch(err => failed_head_operations.push(err));
176+
}
177+
await P.delay(1000);
178+
expect(failed_put_operations.length).toBe(0);
179+
expect(failed_head_operations.length).toBe(0);
180+
expect(successful_head_operations.length).toBe(number_of_iterations);
181+
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
182+
expect(versions.objects.length).toBe(number_of_iterations + 1); // 1 version before + 10 versions concurrent
183+
});
127184
});
185+
186+
/**
187+
* _upload_versions uploads number_of_versions of key in bucket with a body of random data
188+
* note: this function is not concurrent, it's a helper function for preparing a bucket with a couple of versions
189+
* @param {string} bucket
190+
* @param {string} key
191+
* @param {number} number_of_versions
192+
*/
193+
async function _upload_versions(bucket, key, number_of_versions) {
194+
const versions_arr = [];
195+
for (let i = 0; i < number_of_versions; i++) {
196+
const random_data = Buffer.from(String(i));
197+
const body = buffer_utils.buffer_to_read_stream(random_data);
198+
const res = await nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
199+
.catch(err => console.log('put error - ', err));
200+
versions_arr.push(res.etag);
201+
}
202+
return versions_arr;
203+
}

0 commit comments

Comments
 (0)