2
2
'use strict' ;
3
3
4
4
const path = require ( 'path' ) ;
5
+ const config = require ( '../../../../config' ) ;
5
6
const P = require ( '../../../util/promise' ) ;
6
7
const fs_utils = require ( '../../../util/fs_utils' ) ;
7
8
const NamespaceFS = require ( '../../../sdk/namespace_fs' ) ;
@@ -26,60 +27,63 @@ function make_dummy_object_sdk(nsfs_config, uid, gid) {
26
27
} ;
27
28
}
28
29
30
+ const tmp_fs_path = path . join ( TMP_PATH , 'test_versioning_concurrency' ) ;
31
+
32
+ const nsfs = new NamespaceFS ( {
33
+ bucket_path : tmp_fs_path ,
34
+ bucket_id : '1' ,
35
+ namespace_resource_id : undefined ,
36
+ access_mode : undefined ,
37
+ versioning : 'ENABLED' ,
38
+ force_md5_etag : false ,
39
+ stats : endpoint_stats_collector . instance ( ) ,
40
+ } ) ;
41
+
29
42
const DUMMY_OBJECT_SDK = make_dummy_object_sdk ( true ) ;
30
43
describe ( 'test versioning concurrency' , ( ) => {
31
- const tmp_fs_path = path . join ( TMP_PATH , 'test_versioning_concurrency' ) ;
32
-
33
- const nsfs = new NamespaceFS ( {
34
- bucket_path : tmp_fs_path ,
35
- bucket_id : '1' ,
36
- namespace_resource_id : undefined ,
37
- access_mode : undefined ,
38
- versioning : 'ENABLED' ,
39
- force_md5_etag : false ,
40
- stats : endpoint_stats_collector . instance ( ) ,
41
- } ) ;
44
+ const prior_value_of_nsfs_rename_retries = config . NSFS_RENAME_RETRIES ;
42
45
43
46
beforeEach ( async ( ) => {
44
47
await fs_utils . create_fresh_path ( tmp_fs_path ) ;
45
48
} ) ;
46
49
47
50
afterEach ( async ( ) => {
48
51
await fs_utils . folder_delete ( tmp_fs_path ) ;
52
+ config . NSFS_RENAME_RETRIES = prior_value_of_nsfs_rename_retries ;
49
53
} ) ;
50
54
51
55
it ( 'multiple puts of the same key' , async ( ) => {
52
56
const bucket = 'bucket1' ;
53
57
const key = 'key1' ;
58
+ const failed_operations = [ ] ;
54
59
for ( let i = 0 ; i < 5 ; i ++ ) {
55
60
const random_data = Buffer . from ( String ( i ) ) ;
56
61
const body = buffer_utils . buffer_to_read_stream ( random_data ) ;
57
- nsfs . upload_object ( { bucket : bucket , key : key , source_stream : body } , DUMMY_OBJECT_SDK ) . catch ( err => console . log ( 'multiple puts of the same key error - ' , err ) ) ;
62
+ nsfs . upload_object ( { bucket : bucket , key : key , source_stream : body } , DUMMY_OBJECT_SDK )
63
+ . catch ( err => failed_operations . push ( err ) ) ;
58
64
}
59
65
await P . delay ( 1000 ) ;
66
+ expect ( failed_operations . length ) . toBe ( 0 ) ;
60
67
const versions = await nsfs . list_object_versions ( { bucket : bucket } , DUMMY_OBJECT_SDK ) ;
61
68
expect ( versions . objects . length ) . toBe ( 5 ) ;
62
69
} ) ;
63
70
64
71
it ( 'multiple delete version id and key' , async ( ) => {
65
72
const bucket = 'bucket1' ;
66
73
const key = 'key2' ;
67
- const versions_arr = [ ] ;
68
- // upload 5 versions of key2
69
- for ( let i = 0 ; i < 5 ; i ++ ) {
70
- const random_data = Buffer . from ( String ( i ) ) ;
71
- const body = buffer_utils . buffer_to_read_stream ( random_data ) ;
72
- const res = await nsfs . upload_object ( { bucket : bucket , key : key , source_stream : body } , DUMMY_OBJECT_SDK ) . catch ( err => console . log ( 'put error - ' , err ) ) ;
73
- versions_arr . push ( res . etag ) ;
74
- }
74
+ const number_of_versions = 5 ;
75
+ const versions_arr = await _upload_versions ( bucket , key , number_of_versions ) ;
76
+
75
77
const mid_version_id = versions_arr [ 3 ] ;
76
78
const number_of_successful_operations = [ ] ;
79
+ const failed_operations = [ ] ;
77
80
for ( let i = 0 ; i < 15 ; i ++ ) {
78
81
nsfs . delete_object ( { bucket : bucket , key : key , version_id : mid_version_id } , DUMMY_OBJECT_SDK )
79
82
. then ( res => number_of_successful_operations . push ( res ) )
80
- . catch ( err => console . log ( 'delete the same key & version id error - ' , err ) ) ;
83
+ . catch ( err => failed_operations . push ( err ) ) ;
81
84
}
82
85
await P . delay ( 1000 ) ;
86
+ expect ( failed_operations . length ) . toBe ( 0 ) ;
83
87
expect ( number_of_successful_operations . length ) . toBe ( 15 ) ;
84
88
} ) ;
85
89
@@ -124,4 +128,76 @@ describe('test versioning concurrency', () => {
124
128
const list_res = await nsfs . list_objects ( { bucket : bucket } , DUMMY_OBJECT_SDK ) ;
125
129
expect ( list_res . objects . length ) . toBe ( 0 ) ;
126
130
} , 8000 ) ;
131
+
132
+ it ( 'concurrent delete of latest version' , async ( ) => {
133
+ const bucket = 'bucket1' ;
134
+ const key = 'key3' ;
135
+ const number_of_versions = 5 ;
136
+ const versions_arr = await _upload_versions ( bucket , key , number_of_versions ) ;
137
+ expect ( versions_arr . length ) . toBe ( number_of_versions ) ;
138
+
139
+ const successful_operations = [ ] ;
140
+ const failed_operations = [ ] ;
141
+ for ( let i = 0 ; i < 3 ; i ++ ) {
142
+ nsfs . delete_object ( { bucket : bucket , key : key } , DUMMY_OBJECT_SDK )
143
+ . then ( res => successful_operations . push ( res ) )
144
+ . catch ( err => failed_operations . push ( err ) ) ;
145
+ }
146
+
147
+ await P . delay ( 1000 ) ;
148
+ expect ( failed_operations . length ) . toBe ( 0 ) ;
149
+ expect ( successful_operations . length ) . toBe ( 3 ) ;
150
+ const versions = await nsfs . list_object_versions ( { bucket : bucket } , DUMMY_OBJECT_SDK ) ;
151
+ expect ( versions . objects . length ) . toBe ( 8 ) ; // 5 versions before + 3 delete markers concurrent
152
+ const delete_marker_arr = versions . objects . filter ( object => object . delete_marker === true ) ;
153
+ expect ( delete_marker_arr . length ) . toBe ( 3 ) ;
154
+ } ) ;
155
+
156
+ it ( 'concurrent put object and head object latest version' , async ( ) => {
157
+ const bucket = 'bucket1' ;
158
+ const key = 'key4' ;
159
+ await _upload_versions ( bucket , key , 1 ) ;
160
+
161
+ const successful_put_operations = [ ] ;
162
+ const successful_head_operations = [ ] ;
163
+ const failed_put_operations = [ ] ;
164
+ const failed_head_operations = [ ] ;
165
+ const number_of_iterations = 10 ;
166
+ config . NSFS_RENAME_RETRIES = 40 ;
167
+ for ( let i = 0 ; i < number_of_iterations ; i ++ ) {
168
+ const random_data = Buffer . from ( String ( i ) ) ;
169
+ const body = buffer_utils . buffer_to_read_stream ( random_data ) ;
170
+ nsfs . upload_object ( { bucket : bucket , key : key , source_stream : body } , DUMMY_OBJECT_SDK )
171
+ . then ( res => successful_put_operations . push ( res ) )
172
+ . catch ( err => failed_put_operations . push ( err ) ) ;
173
+ nsfs . read_object_md ( { bucket : bucket , key : key } , DUMMY_OBJECT_SDK )
174
+ . then ( res => successful_head_operations . push ( res ) )
175
+ . catch ( err => failed_head_operations . push ( err ) ) ;
176
+ }
177
+ await P . delay ( 1000 ) ;
178
+ expect ( failed_put_operations . length ) . toBe ( 0 ) ;
179
+ expect ( failed_head_operations . length ) . toBe ( 0 ) ;
180
+ expect ( successful_head_operations . length ) . toBe ( number_of_iterations ) ;
181
+ const versions = await nsfs . list_object_versions ( { bucket : bucket } , DUMMY_OBJECT_SDK ) ;
182
+ expect ( versions . objects . length ) . toBe ( number_of_iterations + 1 ) ; // 1 version before + 10 versions concurrent
183
+ } ) ;
127
184
} ) ;
185
+
186
+ /**
187
+ * _upload_versions uploads number_of_versions of key in bucket with a body of random data
188
+ * note: this function is not concurrent, it's a helper function for preparing a bucket with a couple of versions
189
+ * @param {string } bucket
190
+ * @param {string } key
191
+ * @param {number } number_of_versions
192
+ */
193
+ async function _upload_versions ( bucket , key , number_of_versions ) {
194
+ const versions_arr = [ ] ;
195
+ for ( let i = 0 ; i < number_of_versions ; i ++ ) {
196
+ const random_data = Buffer . from ( String ( i ) ) ;
197
+ const body = buffer_utils . buffer_to_read_stream ( random_data ) ;
198
+ const res = await nsfs . upload_object ( { bucket : bucket , key : key , source_stream : body } , DUMMY_OBJECT_SDK )
199
+ . catch ( err => console . log ( 'put error - ' , err ) ) ;
200
+ versions_arr . push ( res . etag ) ;
201
+ }
202
+ return versions_arr ;
203
+ }
0 commit comments