@@ -1140,9 +1140,7 @@ class NamespaceFS {
1140
1140
// end the stream
1141
1141
res . end ( ) ;
1142
1142
1143
- // in case of transform streams such as ChunkFS there is also a readable part. since we expect write stream
1144
- // and don't care about the readable part, set readable: false
1145
- await stream_utils . wait_finished ( res , { readable : false , signal : object_sdk . abort_controller . signal } ) ;
1143
+ await stream_utils . wait_finished ( res , { signal : object_sdk . abort_controller . signal } ) ;
1146
1144
object_sdk . throw_if_aborted ( ) ;
1147
1145
1148
1146
dbg . log0 ( 'NamespaceFS: read_object_stream completed file' , file_path , {
@@ -1581,9 +1579,15 @@ class NamespaceFS {
1581
1579
large_buf_size : multi_buffer_pool . get_buffers_pool ( undefined ) . buf_size
1582
1580
} ) ;
1583
1581
chunk_fs . on ( 'error' , err1 => dbg . error ( 'namespace_fs._upload_stream: error occured on stream ChunkFS: ' , err1 ) ) ;
1582
+ chunk_fs . on ( 'finish' , arg => dbg . error ( 'namespace_fs._upload_stream: finish occured on stream ChunkFS: ' , arg ) ) ;
1583
+ chunk_fs . on ( 'close' , arg => dbg . error ( 'namespace_fs._upload_stream: close occured on stream ChunkFS: ' , arg ) ) ;
1584
1584
if ( copy_source ) {
1585
+ // ChunkFS is a Transform stream, however read_object_stream expects a write stream. call resume to close the read part
1586
+ // we need to close both read and write parts for Transform stream to properly close and release resorces
1587
+ chunk_fs . resume ( ) ;
1585
1588
await this . read_object_stream ( copy_source , object_sdk , chunk_fs ) ;
1586
1589
} else if ( params . source_params ) {
1590
+ chunk_fs . resume ( ) ;
1587
1591
await params . source_ns . read_object_stream ( params . source_params , object_sdk , chunk_fs ) ;
1588
1592
} else {
1589
1593
await stream_utils . pipeline ( [ source_stream , chunk_fs ] ) ;
0 commit comments