@@ -38,6 +38,7 @@ const size_utils = require('../../util/size_utils');
38
38
const os_utils = require ( '../../util/os_utils' ) ;
39
39
const config = require ( '../../../config' ) ;
40
40
const db_client = require ( '../../util/db_client' ) ;
41
+ const { decode_json } = require ( '../../util/postgres_client' ) ;
41
42
42
43
const { RpcError } = require ( '../../rpc' ) ;
43
44
const master_key_manager = require ( './master_key_manager' ) ;
@@ -152,6 +153,10 @@ const COLLECTIONS_BY_NAME = _.keyBy(COLLECTIONS, 'name');
152
153
153
154
const accounts_by_email_lowercase = [ ] ;
154
155
156
+ const SOURCE = Object . freeze ( {
157
+ DB : 'DB' ,
158
+ CORE : 'CORE' ,
159
+ } ) ;
155
160
156
161
/**
157
162
*
@@ -352,6 +357,8 @@ class SystemStore extends EventEmitter {
352
357
this . START_REFRESH_THRESHOLD = 10 * 60 * 1000 ;
353
358
this . FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000 ;
354
359
this . SYSTEM_STORE_LOAD_CONCURRENCY = config . SYSTEM_STORE_LOAD_CONCURRENCY || 5 ;
360
+ this . source = ( process . env . HOSTNAME && process . env . HOSTNAME . indexOf ( "endpoint" ) === - 1 ) ? SOURCE . DB : SOURCE . CORE ;
361
+ dbg . log0 ( "system store source is" , this . source ) ;
355
362
this . _load_serial = new semaphore . Semaphore ( 1 , { warning_timeout : this . START_REFRESH_THRESHOLD } ) ;
356
363
for ( const col of COLLECTIONS ) {
357
364
db_client . instance ( ) . define_collection ( col ) ;
@@ -406,8 +413,6 @@ class SystemStore extends EventEmitter {
406
413
dbg . warn ( `system_store.refresh: system_store.data.time > FORCE_REFRESH_THRESHOLD, since_load = ${ since_load } , FORCE_REFRESH_THRESHOLD = ${ this . FORCE_REFRESH_THRESHOLD } ` ) ;
407
414
res = this . load ( ) ;
408
415
}
409
- //call refresh periodically
410
- P . delay_unblocking ( this . START_REFRESH_THRESHOLD ) . then ( this . refresh ) ;
411
416
return res ;
412
417
}
413
418
@@ -418,18 +423,23 @@ class SystemStore extends EventEmitter {
418
423
try {
419
424
dbg . log3 ( 'SystemStore: loading ... this.last_update_time =' , this . last_update_time , ", since =" , since ) ;
420
425
421
- const new_data = new SystemStoreData ( ) ;
422
-
423
426
// If we get a load request with an timestamp older then our last update time
424
427
// we ensure we load everyting from that timestamp by updating our last_update_time.
425
428
if ( ! _ . isUndefined ( since ) && since < this . last_update_time ) {
426
429
dbg . log0 ( 'SystemStore.load: Got load request with a timestamp' , since , 'older than my last update time' , this . last_update_time ) ;
427
430
this . last_update_time = since ;
428
431
}
429
432
this . master_key_manager . load_root_key ( ) ;
433
+ const new_data = new SystemStoreData ( ) ;
430
434
let millistamp = time_utils . millistamp ( ) ;
431
435
await this . _register_for_changes ( ) ;
432
- await this . _read_new_data_from_db ( new_data ) ;
436
+ if ( this . source === SOURCE . DB ) {
437
+ await this . _read_new_data_from_db ( new_data ) ;
438
+ } else {
439
+ this . data = new SystemStoreData ( ) ;
440
+ await this . _read_new_data_from_core ( this . data ) ;
441
+ }
442
+
433
443
const secret = await os_utils . read_server_secret ( ) ;
434
444
this . _server_secret = secret ;
435
445
if ( dbg . should_log ( 1 ) ) { //param should match below logs' level
@@ -439,8 +449,10 @@ class SystemStore extends EventEmitter {
439
449
depth : 4
440
450
} ) ) ;
441
451
}
442
- this . old_db_data = this . _update_data_from_new ( this . old_db_data || { } , new_data ) ;
443
- this . data = _ . cloneDeep ( this . old_db_data ) ;
452
+ if ( this . source === SOURCE . DB ) {
453
+ this . old_db_data = this . _update_data_from_new ( this . old_db_data || { } , new_data ) ;
454
+ this . data = _ . cloneDeep ( this . old_db_data ) ;
455
+ }
444
456
millistamp = time_utils . millistamp ( ) ;
445
457
this . data . rebuild ( ) ;
446
458
dbg . log1 ( 'SystemStore: rebuild took' , time_utils . millitook ( millistamp ) ) ;
@@ -462,6 +474,12 @@ class SystemStore extends EventEmitter {
462
474
} ) ;
463
475
}
464
476
477
+ //return the latest copy of in-memory data
478
+ async recent_db_data ( ) {
479
+ //return this.db_clone;
480
+ return this . _load_serial . surround ( async ( ) => this . old_db_data ) ;
481
+ }
482
+
465
483
_update_data_from_new ( data , new_data ) {
466
484
COLLECTIONS . forEach ( col => {
467
485
const old_items = data [ col . name ] ;
@@ -527,6 +545,27 @@ class SystemStore extends EventEmitter {
527
545
this . last_update_time = now ;
528
546
}
529
547
548
+ async _read_new_data_from_core ( target ) {
549
+ dbg . log0 ( "_read_new_data_from_core" ) ;
550
+ const res = await server_rpc . client . system . get_system_store ( ) ;
551
+ for ( const key of Object . keys ( res ) ) {
552
+ const collection = COLLECTIONS_BY_NAME [ key ] ;
553
+ if ( collection ) {
554
+ target [ key ] = [ ] ;
555
+ _ . each ( res [ key ] , item => {
556
+ //these two lines will transform string values into appropriately typed objects
557
+ //(SensitiveString, ObjectId) according to schema
558
+ const after = decode_json ( collection . schema , item ) ;
559
+ db_client . instance ( ) . validate ( key , after ) ;
560
+ target [ key ] . push ( after ) ;
561
+ } ) ;
562
+ } else {
563
+ target [ key ] = res [ key ] ;
564
+ }
565
+ }
566
+ return res ;
567
+ }
568
+
530
569
_check_schema ( col , item , warn ) {
531
570
return db_client . instance ( ) . validate ( col . name , item , warn ) ;
532
571
}
@@ -619,7 +658,7 @@ class SystemStore extends EventEmitter {
619
658
if ( any_news ) {
620
659
if ( this . is_standalone ) {
621
660
await this . load ( last_update ) ;
622
- } else if ( publish ) {
661
+ } else /* if (publish)*/ {
623
662
// notify all the cluster (including myself) to reload
624
663
await server_rpc . client . redirector . publish_to_cluster ( {
625
664
method_api : 'server_inter_process_api' ,
@@ -855,3 +894,4 @@ SystemStore._instance = undefined;
855
894
// EXPORTS
856
895
exports . SystemStore = SystemStore ;
857
896
exports . get_instance = SystemStore . get_instance ;
897
+ exports . SOURCE = SOURCE ;
0 commit comments