@@ -483,7 +483,7 @@ class NamespaceFS {
483
483
}
484
484
485
485
/**
486
- * @param {nb.ObjectSDK } object_sdk
486
+ * @param {nb.ObjectSDK } object_sdk
487
487
* @returns {nb.NativeFSContext }
488
488
*/
489
489
prepare_fs_context ( object_sdk ) {
@@ -1090,7 +1090,9 @@ class NamespaceFS {
1090
1090
// end the stream
1091
1091
res . end ( ) ;
1092
1092
1093
- await stream_utils . wait_finished ( res , { signal : object_sdk . abort_controller . signal } ) ;
1093
+ // in case of transform streams such as ChunkFS there is also a readable part. since we expect write stream
1094
+ // and don't care about the readable part, set readable: false
1095
+ await stream_utils . wait_finished ( res , { readable : false , signal : object_sdk . abort_controller . signal } ) ;
1094
1096
object_sdk . throw_if_aborted ( ) ;
1095
1097
1096
1098
dbg . log0 ( 'NamespaceFS: read_object_stream completed file' , file_path , {
@@ -1209,9 +1211,7 @@ class NamespaceFS {
1209
1211
}
1210
1212
1211
1213
if ( copy_res ) {
1212
- if ( copy_res === copy_status_enum . FALLBACK ) {
1213
- params . copy_source . nsfs_copy_fallback ( ) ;
1214
- } else {
1214
+ if ( copy_res !== copy_status_enum . FALLBACK ) {
1215
1215
// open file after copy link/same inode should use read open mode
1216
1216
open_mode = config . NSFS_OPEN_READ_MODE ;
1217
1217
if ( copy_res === copy_status_enum . SAME_INODE ) open_path = file_path ;
@@ -1294,10 +1294,8 @@ class NamespaceFS {
1294
1294
const stat = await target_file . stat ( fs_context ) ;
1295
1295
this . _verify_encryption ( params . encryption , this . _get_encryption_info ( stat ) ) ;
1296
1296
1297
- // handle xattr
1298
- // assign user xattr on non copy / copy with xattr_copy header provided
1299
1297
const copy_xattr = params . copy_source && params . xattr_copy ;
1300
- let fs_xattr = copy_xattr ? undefined : to_fs_xattr ( params . xattr ) ;
1298
+ let fs_xattr = to_fs_xattr ( params . xattr ) ;
1301
1299
1302
1300
// assign noobaa internal xattr - content type, md5, versioning xattr
1303
1301
if ( params . content_type ) {
@@ -1339,7 +1337,6 @@ class NamespaceFS {
1339
1337
1340
1338
// when object is a dir, xattr are set on the folder itself and the content is in .folder file
1341
1339
if ( is_dir_content ) {
1342
- if ( params . copy_source ) fs_xattr = await this . _get_copy_source_xattr ( params , fs_context , fs_xattr ) ;
1343
1340
await this . _assign_dir_content_to_xattr ( fs_context , fs_xattr , { ...params , size : stat . size } , copy_xattr ) ;
1344
1341
}
1345
1342
stat . xattr = { ...stat . xattr , ...fs_xattr } ;
@@ -1351,12 +1348,11 @@ class NamespaceFS {
1351
1348
await native_fs_utils . _make_path_dirs ( file_path , fs_context ) ;
1352
1349
const copy_xattr = params . copy_source && params . xattr_copy ;
1353
1350
1354
- let fs_xattr = copy_xattr ? { } : to_fs_xattr ( params . xattr ) || { } ;
1351
+ let fs_xattr = to_fs_xattr ( params . xattr ) || { } ;
1355
1352
if ( params . content_type ) {
1356
1353
fs_xattr = fs_xattr || { } ;
1357
1354
fs_xattr [ XATTR_CONTENT_TYPE ] = params . content_type ;
1358
1355
}
1359
- if ( params . copy_source ) fs_xattr = await this . _get_copy_source_xattr ( params , fs_context , fs_xattr ) ;
1360
1356
1361
1357
await this . _assign_dir_content_to_xattr ( fs_context , fs_xattr , params , copy_xattr ) ;
1362
1358
// when .folder exist and it's no upload flow - .folder should be deleted if it exists
@@ -1372,13 +1368,6 @@ class NamespaceFS {
1372
1368
return upload_info ;
1373
1369
}
1374
1370
1375
- async _get_copy_source_xattr ( params , fs_context , fs_xattr ) {
1376
- const is_source_dir = params . copy_source . key . endsWith ( '/' ) ;
1377
- const source_file_md_path = await this . _find_version_path ( fs_context , params . copy_source , is_source_dir ) ;
1378
- const source_stat = await nb_native ( ) . fs . stat ( fs_context , source_file_md_path ) ;
1379
- return { ...source_stat . xattr , ...fs_xattr } ;
1380
- }
1381
-
1382
1371
// move to dest GPFS (wt) / POSIX (w / undefined) - non part upload
1383
1372
async _move_to_dest ( fs_context , source_path , dest_path , target_file , open_mode , key ) {
1384
1373
let retries = config . NSFS_RENAME_RETRIES ;
@@ -1511,7 +1500,7 @@ class NamespaceFS {
1511
1500
// Can be finetuned further on if needed and inserting the Semaphore logic inside
1512
1501
// Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream)
1513
1502
async _upload_stream ( { fs_context, params, target_file, object_sdk, offset } ) {
1514
- const { source_stream } = params ;
1503
+ const { source_stream, copy_source } = params ;
1515
1504
try {
1516
1505
// Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy
1517
1506
const md5_enabled = this . _is_force_md5_enabled ( object_sdk ) ;
@@ -1526,8 +1515,14 @@ class NamespaceFS {
1526
1515
large_buf_size : multi_buffer_pool . get_buffers_pool ( undefined ) . buf_size
1527
1516
} ) ;
1528
1517
chunk_fs . on ( 'error' , err1 => dbg . error ( 'namespace_fs._upload_stream: error occured on stream ChunkFS: ' , err1 ) ) ;
1529
- await stream_utils . pipeline ( [ source_stream , chunk_fs ] ) ;
1530
- await stream_utils . wait_finished ( chunk_fs ) ;
1518
+ if ( copy_source ) {
1519
+ await this . read_object_stream ( copy_source , object_sdk , chunk_fs ) ;
1520
+ } else if ( params . source_params ) {
1521
+ await params . source_ns . read_object_stream ( params . source_params , object_sdk , chunk_fs ) ;
1522
+ } else {
1523
+ await stream_utils . pipeline ( [ source_stream , chunk_fs ] ) ;
1524
+ await stream_utils . wait_finished ( chunk_fs ) ;
1525
+ }
1531
1526
return { digest : chunk_fs . digest , total_bytes : chunk_fs . total_bytes } ;
1532
1527
} catch ( error ) {
1533
1528
dbg . error ( '_upload_stream had error: ' , error ) ;
@@ -1813,6 +1808,7 @@ class NamespaceFS {
1813
1808
upload_params . params . xattr = create_params_parsed . xattr ;
1814
1809
upload_params . params . storage_class = create_params_parsed . storage_class ;
1815
1810
upload_params . digest = MD5Async && ( ( ( await MD5Async . digest ( ) ) . toString ( 'hex' ) ) + '-' + multiparts . length ) ;
1811
+ upload_params . params . content_type = create_params_parsed . content_type ;
1816
1812
1817
1813
const upload_info = await this . _finish_upload ( upload_params ) ;
1818
1814
0 commit comments