Skip to content

Commit 93bd92a

Browse files
committed
system store - endpoints load from core instead of db
Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com>
1 parent cb347df commit 93bd92a

File tree

7 files changed

+79
-12
lines changed

7 files changed

+79
-12
lines changed

src/api/system_api.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,27 @@ module.exports = {
459459
auth: {
460460
system: 'admin'
461461
}
462+
},
463+
464+
get_system_store: {
465+
method: 'GET',
466+
/*params: {
467+
type: 'object'
468+
},*/
469+
reply: {
470+
type: 'object',
471+
/*properites: {
472+
systems: {
473+
type: 'array',
474+
items: {
475+
$ref: '#/definitions/role_info'
476+
}
477+
}
478+
}*/
479+
},
480+
auth: {
481+
system: false
482+
}
462483
}
463484
},
464485

src/endpoint/endpoint.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ async function main(options = {}) {
171171
// Load a system store instance for the current process and register for changes.
172172
// We do not wait for it in becasue the result or errors are not relevent at
173173
// this point (and should not kill the process);
174-
system_store.get_instance().load();
174+
system_store.get_instance({source: system_store.SOURCE.CORE}).load();
175175
// Register the process as an md_server.
176176
await md_server.register_rpc();
177177
}

src/rpc/rpc_schema.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class RpcSchema {
5959
assert(!this[api.$id], 'RPC: api already registered ' + api.$id);
6060
assert(!this._compiled, 'RPC: schema is already compiled');
6161

62-
_.each(api.definitions, schema => {
62+
/*().each(api.definitions, schema => {
6363
schema_utils.strictify(schema, {
6464
additionalProperties: false
6565
});
@@ -71,7 +71,7 @@ class RpcSchema {
7171
schema_utils.strictify(method_api.reply, {
7272
additionalProperties: false
7373
});
74-
});
74+
});*/
7575
try {
7676
this._ajv.addSchema(api);
7777
} catch (err) {

src/server/bg_services/cluster_hb.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ function do_heartbeat({ skip_server_monitor } = {}) {
9393
update: {
9494
clusters: [update]
9595
}
96-
}, false);
96+
}, true);
9797
});
9898
})
9999
.then(() => {

src/server/system_services/system_server.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,9 @@ function get_system_status(req) {
303303
};
304304
}
305305

306+
async function get_system_store() {
307+
return await system_store.recent_db_data();
308+
}
306309

307310
async function _update_system_state(system_id, mode) {
308311
const update = {
@@ -1600,3 +1603,5 @@ exports.rotate_master_key = rotate_master_key;
16001603
exports.disable_master_key = disable_master_key;
16011604
exports.enable_master_key = enable_master_key;
16021605
exports.upgrade_master_keys = upgrade_master_keys;
1606+
1607+
exports.get_system_store = get_system_store;

src/server/system_services/system_store.js

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const size_utils = require('../../util/size_utils');
3838
const os_utils = require('../../util/os_utils');
3939
const config = require('../../../config');
4040
const db_client = require('../../util/db_client');
41+
const { decode_json } = require('../../util/postgres_client');
4142

4243
const { RpcError } = require('../../rpc');
4344
const master_key_manager = require('./master_key_manager');
@@ -152,6 +153,10 @@ const COLLECTIONS_BY_NAME = _.keyBy(COLLECTIONS, 'name');
152153

153154
const accounts_by_email_lowercase = [];
154155

156+
const SOURCE = Object.freeze({
157+
DB: 'DB',
158+
CORE: 'CORE',
159+
});
155160

156161
/**
157162
*
@@ -352,6 +357,8 @@ class SystemStore extends EventEmitter {
352357
this.START_REFRESH_THRESHOLD = 10 * 60 * 1000;
353358
this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000;
354359
this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
360+
this.source = process.env.HOSTNAME.indexOf("endpoint") === -1 ? SOURCE.DB : SOURCE.CORE;
361+
dbg.log0("system store source is", this.source);
355362
this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD });
356363
for (const col of COLLECTIONS) {
357364
db_client.instance().define_collection(col);
@@ -406,8 +413,6 @@ class SystemStore extends EventEmitter {
406413
dbg.warn(`system_store.refresh: system_store.data.time > FORCE_REFRESH_THRESHOLD, since_load = ${since_load}, FORCE_REFRESH_THRESHOLD = ${this.FORCE_REFRESH_THRESHOLD}`);
407414
res = this.load();
408415
}
409-
//call refresh periodically
410-
P.delay_unblocking(this.START_REFRESH_THRESHOLD).then(this.refresh);
411416
return res;
412417
}
413418

@@ -418,18 +423,23 @@ class SystemStore extends EventEmitter {
418423
try {
419424
dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since);
420425

421-
const new_data = new SystemStoreData();
422-
423426
// If we get a load request with an timestamp older then our last update time
424427
// we ensure we load everyting from that timestamp by updating our last_update_time.
425428
if (!_.isUndefined(since) && since < this.last_update_time) {
426429
dbg.log0('SystemStore.load: Got load request with a timestamp', since, 'older than my last update time', this.last_update_time);
427430
this.last_update_time = since;
428431
}
429432
this.master_key_manager.load_root_key();
433+
const new_data = new SystemStoreData();
430434
let millistamp = time_utils.millistamp();
431435
await this._register_for_changes();
432-
await this._read_new_data_from_db(new_data);
436+
if (this.source === SOURCE.DB) {
437+
await this._read_new_data_from_db(new_data);
438+
} else {
439+
this.data = new SystemStoreData();
440+
await this._read_new_data_from_core(this.data);
441+
}
442+
433443
const secret = await os_utils.read_server_secret();
434444
this._server_secret = secret;
435445
if (dbg.should_log(1)) { //param should match below logs' level
@@ -439,8 +449,10 @@ class SystemStore extends EventEmitter {
439449
depth: 4
440450
}));
441451
}
442-
this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
443-
this.data = _.cloneDeep(this.old_db_data);
452+
if (this.source === SOURCE.DB) {
453+
this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
454+
this.data = _.cloneDeep(this.old_db_data);
455+
}
444456
millistamp = time_utils.millistamp();
445457
this.data.rebuild();
446458
dbg.log1('SystemStore: rebuild took', time_utils.millitook(millistamp));
@@ -462,6 +474,12 @@ class SystemStore extends EventEmitter {
462474
});
463475
}
464476

477+
//return the latest copy of in-memory data
478+
async recent_db_data() {
479+
//return this.db_clone;
480+
return this._load_serial.surround(async () => this.old_db_data);
481+
}
482+
465483
_update_data_from_new(data, new_data) {
466484
COLLECTIONS.forEach(col => {
467485
const old_items = data[col.name];
@@ -527,6 +545,27 @@ class SystemStore extends EventEmitter {
527545
this.last_update_time = now;
528546
}
529547

548+
async _read_new_data_from_core(target) {
549+
dbg.log0("_read_new_data_from_core");
550+
const res = await server_rpc.client.system.get_system_store();
551+
for (const key of Object.keys(res)) {
552+
const collection = COLLECTIONS_BY_NAME[key];
553+
if (collection) {
554+
target[key] = [];
555+
_.each(res[key], item => {
556+
//these two lines will transform string values into appropriately typed objects
557+
//(SensitiveString, ObjectId) according to schema
558+
const after = decode_json(collection.schema, item);
559+
db_client.instance().validate(key, after);
560+
target[key].push(after);
561+
});
562+
} else {
563+
target[key] = res[key];
564+
}
565+
}
566+
return res;
567+
}
568+
530569
_check_schema(col, item, warn) {
531570
return db_client.instance().validate(col.name, item, warn);
532571
}
@@ -619,7 +658,7 @@ class SystemStore extends EventEmitter {
619658
if (any_news) {
620659
if (this.is_standalone) {
621660
await this.load(last_update);
622-
} else if (publish) {
661+
} else /*if (publish)*/ {
623662
// notify all the cluster (including myself) to reload
624663
await server_rpc.client.redirector.publish_to_cluster({
625664
method_api: 'server_inter_process_api',
@@ -855,3 +894,4 @@ SystemStore._instance = undefined;
855894
// EXPORTS
856895
exports.SystemStore = SystemStore;
857896
exports.get_instance = SystemStore.get_instance;
897+
exports.SOURCE = SOURCE;

src/util/postgres_client.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1951,3 +1951,4 @@ PostgresClient._instance = undefined;
19511951
// EXPORTS
19521952
exports.PostgresClient = PostgresClient;
19531953
exports.instance = PostgresClient.instance;
1954+
exports.decode_json = decode_json;

0 commit comments

Comments
 (0)