@@ -5,11 +5,13 @@ const dbg = require('../util/debug_module')(__filename);
5
5
const config = require ( '../../config' ) ;
6
6
const stream = require ( 'stream' ) ;
7
7
const crypto = require ( 'crypto' ) ;
8
+ const path = require ( 'path' ) ;
8
9
const { PersistentLogger, LogFile } = require ( '../util/persistent_logger' ) ;
9
10
const { format_aws_date } = require ( '../util/time_utils' ) ;
10
11
const nsfs_schema_utils = require ( '../manage_nsfs/nsfs_schema_utils' ) ;
11
12
const Semaphore = require ( '../util/semaphore' ) ;
12
13
const P = require ( '../util/promise' ) ;
14
+ const nb_native = require ( '../util/nb_native' ) ;
13
15
14
16
const sem = new Semaphore ( config . BUCKET_LOG_CONCURRENCY ) ;
15
17
@@ -27,16 +29,21 @@ const BUCKET_NAME_DEL = "_";
27
29
* @param {AWS.S3 } s3_connection
28
30
* @param {function } bucket_to_owner_keys_func
29
31
*/
30
- async function export_logs_to_target ( config_fs , s3_connection , bucket_to_owner_keys_func ) {
31
- const log = new PersistentLogger ( config . PERSISTENT_BUCKET_LOG_DIR , config . PERSISTENT_BUCKET_LOG_NS , { locking : 'EXCLUSIVE' } ) ;
32
- try {
33
- return log . process ( async file => _upload_to_targets ( config_fs , s3_connection , file , bucket_to_owner_keys_func ) ) ;
34
- } catch ( err ) {
35
- dbg . error ( 'processing log file failed' , log . file ) ;
36
- throw err ;
37
- } finally {
38
- await log . close ( ) ;
39
- }
32
+ async function export_logs_to_target ( fs_context , s3_connection , bucket_to_owner_keys_func ) {
33
+ const entries = await nb_native ( ) . fs . readdir ( fs_context , config . PERSISTENT_BUCKET_LOG_DIR ) ;
34
+ const results = await P . map_with_concurrency ( 5 , entries , async entry => {
35
+ if ( ! entry . name . endsWith ( '.log' ) ) return ;
36
+ const log = new PersistentLogger ( config . PERSISTENT_BUCKET_LOG_DIR , path . parse ( entry . name ) . name , { locking : 'EXCLUSIVE' } ) ;
37
+ try {
38
+ return log . process ( async file => _upload_to_targets ( fs_context , s3_connection , file , bucket_to_owner_keys_func ) ) ;
39
+ } catch ( err ) {
40
+ dbg . error ( 'processing log file failed' , log . file ) ;
41
+ throw err ;
42
+ } finally {
43
+ await log . close ( ) ;
44
+ }
45
+ } ) ;
46
+ return ! results . includes ( false ) ;
40
47
}
41
48
42
49
/**
0 commit comments