@@ -5,8 +5,7 @@ const path = require('path');
5
5
const { PersistentLogger } = require ( '../util/persistent_logger' ) ;
6
6
const config = require ( '../../config' ) ;
7
7
const nb_native = require ( '../util/nb_native' ) ;
8
- const { GlacierBackend } = require ( '../sdk/nsfs_glacier_backend/backend' ) ;
9
- const { getGlacierBackend } = require ( '../sdk/nsfs_glacier_backend/helper' ) ;
8
+ const { Glacier } = require ( '../sdk/glacier' ) ;
10
9
const native_fs_utils = require ( '../util/native_fs_utils' ) ;
11
10
12
11
const CLUSTER_LOCK = 'cluster.lock' ;
@@ -16,14 +15,15 @@ async function process_migrations() {
16
15
const fs_context = native_fs_utils . get_process_fs_context ( ) ;
17
16
18
17
await lock_and_run ( fs_context , CLUSTER_LOCK , async ( ) => {
19
- const backend = getGlacierBackend ( ) ;
18
+ const backend = Glacier . getBackend ( ) ;
20
19
21
20
if (
22
21
await backend . low_free_space ( ) ||
23
- await time_exceeded ( fs_context , config . NSFS_GLACIER_MIGRATE_INTERVAL , GlacierBackend . MIGRATE_TIMESTAMP_FILE )
22
+ await time_exceeded ( fs_context , config . NSFS_GLACIER_MIGRATE_INTERVAL , Glacier . MIGRATE_TIMESTAMP_FILE ) ||
23
+ await migrate_log_exceeds_threshold ( )
24
24
) {
25
25
await run_glacier_migrations ( fs_context , backend ) ;
26
- await record_current_time ( fs_context , GlacierBackend . MIGRATE_TIMESTAMP_FILE ) ;
26
+ await record_current_time ( fs_context , Glacier . MIGRATE_TIMESTAMP_FILE ) ;
27
27
}
28
28
} ) ;
29
29
}
@@ -32,56 +32,56 @@ async function process_migrations() {
32
32
* run_tape_migrations reads the migration WALs and attempts to migrate the
33
33
* files mentioned in the WAL.
34
34
* @param {nb.NativeFSContext } fs_context
35
- * @param {import('../sdk/nsfs_glacier_backend/backend ').GlacierBackend } backend
35
+ * @param {import('../sdk/glacier ').Glacier } backend
36
36
*/
37
37
async function run_glacier_migrations ( fs_context , backend ) {
38
- await run_glacier_operation ( fs_context , GlacierBackend . MIGRATE_WAL_NAME , backend . migrate . bind ( backend ) ) ;
38
+ await run_glacier_operation ( fs_context , Glacier . MIGRATE_WAL_NAME , backend . migrate . bind ( backend ) ) ;
39
39
}
40
40
41
41
async function process_restores ( ) {
42
42
const fs_context = native_fs_utils . get_process_fs_context ( ) ;
43
43
44
44
await lock_and_run ( fs_context , CLUSTER_LOCK , async ( ) => {
45
- const backend = getGlacierBackend ( ) ;
45
+ const backend = Glacier . getBackend ( ) ;
46
46
47
47
if (
48
48
await backend . low_free_space ( ) ||
49
- ! ( await time_exceeded ( fs_context , config . NSFS_GLACIER_RESTORE_INTERVAL , GlacierBackend . RESTORE_TIMESTAMP_FILE ) )
49
+ ! ( await time_exceeded ( fs_context , config . NSFS_GLACIER_RESTORE_INTERVAL , Glacier . RESTORE_TIMESTAMP_FILE ) )
50
50
) return ;
51
51
52
52
53
53
await run_glacier_restore ( fs_context , backend ) ;
54
- await record_current_time ( fs_context , GlacierBackend . RESTORE_TIMESTAMP_FILE ) ;
54
+ await record_current_time ( fs_context , Glacier . RESTORE_TIMESTAMP_FILE ) ;
55
55
} ) ;
56
56
}
57
57
58
58
/**
59
59
* run_tape_restore reads the restore WALs and attempts to restore the
60
60
* files mentioned in the WAL.
61
- * @param {nb.NativeFSContext } fs_context
62
- * @param {import('../sdk/nsfs_glacier_backend/backend ').GlacierBackend } backend
61
+ * @param {nb.NativeFSContext } fs_context
62
+ * @param {import('../sdk/glacier ').Glacier } backend
63
63
*/
64
64
async function run_glacier_restore ( fs_context , backend ) {
65
- await run_glacier_operation ( fs_context , GlacierBackend . RESTORE_WAL_NAME , backend . restore . bind ( backend ) ) ;
65
+ await run_glacier_operation ( fs_context , Glacier . RESTORE_WAL_NAME , backend . restore . bind ( backend ) ) ;
66
66
}
67
67
68
68
async function process_expiry ( ) {
69
69
const fs_context = native_fs_utils . get_process_fs_context ( ) ;
70
70
71
71
await lock_and_run ( fs_context , SCAN_LOCK , async ( ) => {
72
- const backend = getGlacierBackend ( ) ;
72
+ const backend = Glacier . getBackend ( ) ;
73
73
if (
74
74
await backend . low_free_space ( ) ||
75
75
await is_desired_time (
76
76
fs_context ,
77
77
new Date ( ) ,
78
78
config . NSFS_GLACIER_EXPIRY_RUN_TIME ,
79
79
config . NSFS_GLACIER_EXPIRY_RUN_DELAY_LIMIT_MINS ,
80
- GlacierBackend . EXPIRY_TIMESTAMP_FILE ,
80
+ Glacier . EXPIRY_TIMESTAMP_FILE ,
81
81
)
82
82
) {
83
83
await backend . expiry ( fs_context ) ;
84
- await record_current_time ( fs_context , GlacierBackend . EXPIRY_TIMESTAMP_FILE ) ;
84
+ await record_current_time ( fs_context , Glacier . EXPIRY_TIMESTAMP_FILE ) ;
85
85
}
86
86
} ) ;
87
87
}
@@ -168,6 +168,27 @@ async function record_current_time(fs_context, timestamp_file) {
168
168
) ;
169
169
}
170
170
171
+ /**
172
+ * migrate_log_exceeds_threshold returns true if the underlying backend
173
+ * decides that the migrate log size has exceeded the given size threshold.
174
+ * @param {number } [threshold]
175
+ * @returns {Promise<boolean> }
176
+ */
177
+ async function migrate_log_exceeds_threshold ( threshold = config . NSFS_GLACIER_MIGRATE_LOG_THRESHOLD ) {
178
+ const log = new PersistentLogger ( config . NSFS_GLACIER_LOGS_DIR , Glacier . MIGRATE_WAL_NAME , { locking : null } ) ;
179
+ let log_size = Number . MAX_SAFE_INTEGER ;
180
+ try {
181
+ const fh = await log . _open ( ) ;
182
+
183
+ const { size } = await fh . stat ( log . fs_context ) ;
184
+ log_size = size ;
185
+ } catch ( error ) {
186
+ console . error ( "failed to get size of" , Glacier . MIGRATE_WAL_NAME , error ) ;
187
+ }
188
+
189
+ return log_size > threshold ;
190
+ }
191
+
171
192
/**
172
193
* run_glacier_operations takes a log_namespace and a callback and executes the
173
194
* callback on each log file in that namespace. It will also generate a failure
@@ -178,6 +199,8 @@ async function record_current_time(fs_context, timestamp_file) {
178
199
*/
179
200
async function run_glacier_operation ( fs_context , log_namespace , cb ) {
180
201
const log = new PersistentLogger ( config . NSFS_GLACIER_LOGS_DIR , log_namespace , { locking : 'EXCLUSIVE' } ) ;
202
+
203
+ fs_context = prepare_galcier_fs_context ( fs_context ) ;
181
204
try {
182
205
await log . process ( async ( entry , failure_recorder ) => cb ( fs_context , entry , failure_recorder ) ) ;
183
206
} catch ( error ) {
@@ -230,6 +253,28 @@ async function lock_and_run(fs_context, lockfilename, cb) {
230
253
}
231
254
}
232
255
256
+ /**
257
+ * prepare_galcier_fs_context returns a shallow copy of given
258
+ * fs_context with backend set to 'GPFS'.
259
+ *
260
+ * NOTE: The function will throw error if it detects that libgfs
261
+ * isn't loaded.
262
+ *
263
+ * @param {nb.NativeFSContext } fs_context
264
+ * @returns {nb.NativeFSContext }
265
+ */
266
+ function prepare_galcier_fs_context ( fs_context ) {
267
+ if ( config . NSFS_GLACIER_DMAPI_ENABLE ) {
268
+ if ( ! nb_native ( ) . fs . gpfs ) {
269
+ throw new Error ( 'cannot use DMAPI xattrs: libgpfs not loaded' ) ;
270
+ }
271
+
272
+ return { ...fs_context , backend : 'GPFS' , use_dmapi : true } ;
273
+ }
274
+
275
+ return { ...fs_context } ;
276
+ }
277
+
233
278
exports . process_migrations = process_migrations ;
234
279
exports . process_restores = process_restores ;
235
280
exports . process_expiry = process_expiry ;
0 commit comments