Skip to content

Commit 6d65c1b

Browse files
authored
Merge pull request #8556 from jackyalbo/jacky_remove_kmeans
Removing K-means and test performance from node allocator
2 parents e5e6a35 + 5b30564 commit 6d65c1b

File tree

6 files changed

+15
-250
lines changed

6 files changed

+15
-250
lines changed

config.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ config.BUFFERS_MEM_LIMIT_MIN = 32 * 1024 * 1024; // just some workable minimum s
6464
config.BUFFERS_MEM_LIMIT_MAX = 4 * 1024 * 1024 * 1024;
6565
config.BUFFERS_MEM_LIMIT = Math.min(
6666
config.BUFFERS_MEM_LIMIT_MAX,
67-
Math.max(Math.floor(config.CONTAINER_MEM_LIMIT / 4), config.BUFFERS_MEM_LIMIT_MIN,)
67+
Math.max(Math.floor(config.CONTAINER_MEM_LIMIT / 4), config.BUFFERS_MEM_LIMIT_MIN, )
6868
);
6969

7070
////////////////////////
@@ -104,7 +104,6 @@ config.AGENT_HEARTBEAT_GRACE_TIME = 10 * 60 * 1000; // grace period before an ag
104104
config.CLOUD_ALERT_GRACE_TIME = 3 * 60 * 1000; // grace period before dispatching alert on cloud node status
105105
config.AGENT_RESPONSE_TIMEOUT = 1 * 60 * 1000;
106106
config.AGENT_TEST_CONNECTION_TIMEOUT = 1 * 60 * 1000;
107-
config.STORE_PERF_TEST_INTERVAL = 60 * 60 * 1000; // perform test_store_perf every 1 hour
108107
config.CLOUD_MAX_ALLOWED_IO_TEST_ERRORS = 3;
109108

110109
config.ENABLE_DEV_RANDOM_SEED = process.env.DISABLE_DEV_RANDOM_SEED === 'false' || false;
@@ -755,10 +754,10 @@ config.NSFS_BUF_POOL_MEM_LIMIT_S = Math.min(Math.floor(config.NSFS_MAX_MEM_SIZE_
755754
config.NSFS_WANTED_BUFFERS_NUMBER) * config.NSFS_BUF_SIZE_S;
756755
// Semaphore size will give 90% of remainning memory to large buffer size, 10% to medium
757756
config.NSFS_BUF_POOL_MEM_LIMIT_M = range_utils.align_down((config.BUFFERS_MEM_LIMIT -
758-
config.NSFS_BUF_POOL_MEM_LIMIT_S - config.NSFS_BUF_POOL_MEM_LIMIT_XS) * 0.1,
757+
config.NSFS_BUF_POOL_MEM_LIMIT_S - config.NSFS_BUF_POOL_MEM_LIMIT_XS) * 0.1,
759758
config.NSFS_BUF_SIZE_M);
760759
config.NSFS_BUF_POOL_MEM_LIMIT_L = range_utils.align_down((config.BUFFERS_MEM_LIMIT -
761-
config.NSFS_BUF_POOL_MEM_LIMIT_S - config.NSFS_BUF_POOL_MEM_LIMIT_XS) * 0.9,
760+
config.NSFS_BUF_POOL_MEM_LIMIT_S - config.NSFS_BUF_POOL_MEM_LIMIT_XS) * 0.9,
762761
config.NSFS_BUF_SIZE_L);
763762

764763
config.NSFS_BUF_WARMUP_SPARSE_FILE_READS = true;

src/agent/agent.js

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ class Agent {
164164
'update_create_node_token',
165165
'update_rpc_config',
166166
'n2n_signal',
167-
'test_store_perf',
168167
'test_store_validity',
169168
'test_network_perf',
170169
'test_network_perf_to_peer',
@@ -957,11 +956,6 @@ class Agent {
957956
return this.rpc.accept_n2n_signal(req.rpc_params);
958957
}
959958

960-
async test_store_perf(req) {
961-
if (!this.block_store) return {};
962-
return this.block_store.test_store_perf(req.rpc_params);
963-
}
964-
965959
async test_store_validity(req) {
966960
if (!this.block_store) return;
967961
await this.block_store.test_store_validity();

src/agent/block_store_services/block_store_base.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,7 @@ class BlockStoreBase {
432432
});
433433
// cleanup old versions for block stores that have versioning enabled
434434
if (this._delete_block_past_versions) await this._delete_block_past_versions(block_md);
435+
dbg.log1(`test_store_perf for node ${this.node_name}. results:`, reply);
435436
return reply;
436437
} catch (err) {
437438
if (err.rpc_code !== 'AUTH_FAILED' && err.rpc_code !== 'STORAGE_NOT_EXIST') {

src/api/agent_api.js

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -217,29 +217,6 @@ module.exports = {
217217
},
218218
},
219219

220-
test_store_perf: {
221-
method: 'POST',
222-
params: {
223-
type: 'object',
224-
properties: {
225-
count: {
226-
type: 'integer'
227-
}
228-
}
229-
},
230-
reply: {
231-
type: 'object',
232-
properties: {
233-
write: {
234-
$ref: 'node_api#/definitions/latency_array'
235-
},
236-
read: {
237-
$ref: 'node_api#/definitions/latency_array'
238-
}
239-
}
240-
}
241-
},
242-
243220
test_store_validity: {
244221
method: 'POST',
245222
},

src/server/node_services/nodes_monitor.js

Lines changed: 10 additions & 216 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ const chance = require('chance')();
88
// const dclassify = require('dclassify');
99
const EventEmitter = require('events').EventEmitter;
1010

11-
const kmeans = require('../../util/kmeans');
1211
const P = require('../../util/promise');
1312
const api = require('../../api');
1413
const pkg = require('../../../package.json');
@@ -714,8 +713,6 @@ class NodesMonitor extends EventEmitter {
714713
}
715714
item.node.drives = item.node.drives || [];
716715
item.node.latency_to_server = item.node.latency_to_server || [];
717-
item.node.latency_of_disk_read = item.node.latency_of_disk_read || [];
718-
item.node.latency_of_disk_write = item.node.latency_of_disk_write || [];
719716
item.node.storage = _.defaults(item.node.storage, {
720717
total: 0,
721718
free: 0,
@@ -845,7 +842,6 @@ class NodesMonitor extends EventEmitter {
845842
.then(worker);
846843
};
847844
return P.all(_.times(concur, worker))
848-
// .then(() => this._suggest_pool_assign()) // need to be rethinked - out for
849845
.then(() => this._update_nodes_store('force'))
850846
.catch(err => {
851847
dbg.warn('_run: ERROR', err.stack || err);
@@ -1386,44 +1382,10 @@ class NodesMonitor extends EventEmitter {
13861382
}
13871383
}
13881384

1389-
async _test_store_perf(item) {
1390-
const now = Date.now();
1391-
if (item.last_store_perf_test && now < item.last_store_perf_test + config.STORE_PERF_TEST_INTERVAL) return;
1392-
try {
1393-
1394-
1395-
dbg.log1('running _test_store_perf::', item.node.name);
1396-
const res = await P.timeout(config.AGENT_RESPONSE_TIMEOUT,
1397-
this.client.agent.test_store_perf({
1398-
count: 5
1399-
}, {
1400-
connection: item.connection
1401-
})
1402-
);
1403-
item.last_store_perf_test = Date.now();
1404-
dbg.log0(`_test_store_perf for node ${item.node.name} returned:`, res);
1405-
this._set_need_update.add(item);
1406-
item.node.latency_of_disk_read = js_utils.array_push_keep_latest(
1407-
item.node.latency_of_disk_read, res.read, MAX_NUM_LATENCIES);
1408-
item.node.latency_of_disk_write = js_utils.array_push_keep_latest(
1409-
item.node.latency_of_disk_write, res.write, MAX_NUM_LATENCIES);
1410-
} catch (err) {
1411-
// ignore "unkonown" errors for cloud resources - we don't want to put the node in detention in cases where we don't know what is the problem
1412-
// if there is a real issue, we will take it into account in report_error_on_node_blocks
1413-
if (this._is_cloud_node(item) && err.rpc_code !== 'AUTH_FAILED' && err.rpc_code !== 'STORAGE_NOT_EXIST') {
1414-
dbg.warn(`encountered an unknown error in _test_store_perf. `, err);
1415-
} else {
1416-
dbg.log0(`encountered an error in _test_store_perf. `, err);
1417-
throw err;
1418-
}
1419-
}
1420-
}
1421-
14221385
async _test_store(item) {
14231386
if (!item.connection) return;
14241387

14251388
try {
1426-
await this._test_store_perf(item);
14271389
await this._test_store_validity(item);
14281390

14291391
dbg.log2('_test_store:: success in test', item.node.name);
@@ -1874,8 +1836,6 @@ class NodesMonitor extends EventEmitter {
18741836
item.io_detention = this._get_item_io_detention(item);
18751837
item.connectivity = 'TCP';
18761838
item.avg_ping = _.mean(item.node.latency_to_server);
1877-
item.avg_disk_read = _.mean(item.node.latency_of_disk_read);
1878-
item.avg_disk_write = _.mean(item.node.latency_of_disk_write);
18791839
item.storage_full = this._get_item_storage_full(item);
18801840
item.has_issues = this._get_item_has_issues(item);
18811841
item.readable = this._get_item_readable(item);
@@ -2520,8 +2480,6 @@ class NodesMonitor extends EventEmitter {
25202480

25212481
// aggregate data used by suggested pools classification
25222482
host_item.avg_ping = _.mean(host_nodes.map(item => item.avg_ping));
2523-
host_item.avg_disk_read = _.mean(host_nodes.map(item => item.avg_disk_read));
2524-
host_item.avg_disk_write = _.mean(host_nodes.map(item => item.avg_disk_write));
25252483

25262484

25272485
const host_aggragate = this._aggregate_nodes_list(host_nodes);
@@ -2703,126 +2661,6 @@ class NodesMonitor extends EventEmitter {
27032661
return list.slice(skip, skip + limit);
27042662
}
27052663

2706-
// _suggest_pool_assign() {
2707-
// // prepare nodes data per pool
2708-
// const pools_data_map = new Map();
2709-
// for (const host_nodes of this._map_host_id.values()) {
2710-
// // get the host aggregated item
2711-
// const item = this._consolidate_host(host_nodes);
2712-
// item.suggested_pool = ''; // reset previous suggestion
2713-
// const host_id = String(item.node.host_id);
2714-
// const pool_id = String(item.node.pool);
2715-
// const pool = system_store.data.get_by_id(pool_id);
2716-
// dbg.log3('_suggest_pool_assign: node', item.node.name, 'pool', pool && pool.name);
2717-
// // skip new nodes and cloud\internal nodes
2718-
// if (pool && item.node_from_store && item.node.node_type === 'BLOCK_STORE_FS') {
2719-
// let pool_data = pools_data_map.get(pool_id);
2720-
// if (!pool_data) {
2721-
// pool_data = {
2722-
// pool_id: pool_id,
2723-
// pool_name: pool.name,
2724-
// docs: []
2725-
// };
2726-
// pools_data_map.set(pool_id, pool_data);
2727-
// }
2728-
// const tokens = this._classify_node_tokens(item);
2729-
// pool_data.docs.push(new dclassify.Document(host_id, tokens));
2730-
// }
2731-
// }
2732-
2733-
// // take the data of all the pools and use it to train a classifier of nodes to pools
2734-
// const data_set = new dclassify.DataSet();
2735-
// const classifier = new dclassify.Classifier({
2736-
// applyInverse: true
2737-
// });
2738-
// const pools_to_classify = ['default_resource', config.NEW_SYSTEM_POOL_NAME];
2739-
// let num_trained_pools = 0;
2740-
// for (const pool_data of pools_data_map.values()) {
2741-
// // don't train by the nodes that we need to classify
2742-
// if (!pools_to_classify.includes(pool_data.pool_name)) {
2743-
// dbg.log3('_suggest_pool_assign: add to data set',
2744-
// pool_data.pool_name, pool_data.docs);
2745-
// data_set.add(pool_data.pool_name, pool_data.docs);
2746-
// num_trained_pools += 1;
2747-
// }
2748-
// }
2749-
// if (num_trained_pools <= 0) {
2750-
// dbg.log3('_suggest_pool_assign: no pools to suggest');
2751-
// return;
2752-
// } else if (num_trained_pools === 1) {
2753-
// // the classifier requires at least two options to work
2754-
// dbg.log3('_suggest_pool_assign: only one pool to suggest,',
2755-
// 'too small for real suggestion');
2756-
// return;
2757-
// }
2758-
// classifier.train(data_set);
2759-
// dbg.log3('_suggest_pool_assign: Trained:', classifier,
2760-
// 'probabilities', JSON.stringify(classifier.probabilities));
2761-
2762-
// // for nodes in the default_resource use the classifier to suggest a pool
2763-
// const system = system_store.data.systems[0];
2764-
// const target_pool = system.pools_by_name[config.NEW_SYSTEM_POOL_NAME];
2765-
// const target_pool_data = pools_data_map.get(String(target_pool._id));
2766-
// if (target_pool_data) {
2767-
// for (const doc of target_pool_data.docs) {
2768-
// const host_nodes = this._map_host_id.get(doc.id);
2769-
// const hostname = this._item_hostname(host_nodes[0]);
2770-
// dbg.log0('_suggest_pool_assign: classify start', hostname, doc);
2771-
// const res = classifier.classify(doc);
2772-
// dbg.log0('_suggest_pool_assign: classify result', hostname, res);
2773-
// let suggested_pool;
2774-
// if (res.category !== config.NEW_SYSTEM_POOL_NAME) {
2775-
// suggested_pool = res.category;
2776-
// } else if (res.secondCategory !== config.NEW_SYSTEM_POOL_NAME) {
2777-
// suggested_pool = res.secondCategory;
2778-
// }
2779-
// host_nodes.forEach(item => {
2780-
// item.suggested_pool = suggested_pool;
2781-
// });
2782-
2783-
// }
2784-
2785-
// }
2786-
// }
2787-
2788-
_classify_node_tokens(item) {
2789-
// cannot use numbers as dclassify tokens only discrete strings,
2790-
// so we have to transform numbers to some relevant tokens
2791-
const tokens = [];
2792-
if (item.node.ip) {
2793-
const x = item.node.ip.split('.');
2794-
if (x.length === 4) {
2795-
tokens.push('ip:' + x[0] + '.x.x.x');
2796-
tokens.push('ip:' + x[0] + '.' + x[1] + '.x.x');
2797-
tokens.push('ip:' + x[0] + '.' + x[1] + '.' + x[2] + '.x');
2798-
tokens.push('ip:' + x[0] + '.' + x[1] + '.' + x[2] + '.' + x[3]);
2799-
}
2800-
}
2801-
if (item.node.os_info) {
2802-
tokens.push('platform:' + item.node.os_info.platform);
2803-
tokens.push('arch:' + item.node.os_info.arch);
2804-
tokens.push('totalmem:' + scale_size_token(item.node.os_info.totalmem));
2805-
}
2806-
if (_.isNumber(item.avg_ping)) {
2807-
tokens.push('avg_ping:' + scale_number_token(item.avg_ping));
2808-
}
2809-
if (_.isNumber(item.avg_disk_read)) {
2810-
tokens.push('avg_disk_read:' + scale_number_token(item.avg_disk_read));
2811-
}
2812-
if (_.isNumber(item.avg_disk_write)) {
2813-
tokens.push('avg_disk_write:' + scale_number_token(item.avg_disk_write));
2814-
}
2815-
if (item.node.storage && _.isNumber(item.node.storage.total)) {
2816-
const storage_other =
2817-
item.node.storage.total -
2818-
item.node.storage.used -
2819-
item.node.storage.free;
2820-
tokens.push('storage_other:' + scale_size_token(storage_other));
2821-
tokens.push('storage_total:' + scale_size_token(item.node.storage.total));
2822-
}
2823-
return tokens;
2824-
}
2825-
28262664
list_nodes(query, options) {
28272665
dbg.log2('list_nodes: query', query);
28282666
this._throw_if_not_started_and_loaded();
@@ -3484,53 +3322,18 @@ class NodesMonitor extends EventEmitter {
34843322
list.push(item);
34853323
}
34863324

3487-
const latency_groups = [];
3488-
// Not all nodes always have the avg_disk_write.
3489-
// KMeans needs valid vectors so we exclude the nodes and assume that they are the slowest
3490-
// Since we assume them to be the slowest we will place them in the last KMeans group
3491-
const partition_avg_disk_write = _.partition(list, item => !Number.isNaN(item.avg_disk_write) && _.isNumber(item.avg_disk_write));
3492-
const nodes_with_avg_disk_write = partition_avg_disk_write[0];
3493-
const nodes_without_avg_disk_write = partition_avg_disk_write[1];
3494-
if (nodes_with_avg_disk_write.length >= config.NODE_ALLOCATOR_NUM_CLUSTERS) {
3495-
// TODO:
3496-
// Not handling noise at all.
3497-
// This means that we can have a group of 1 noisy drive.
3498-
// I rely on avg_disk_write as an average reading to handle any noise.
3499-
const kmeans_clusters = kmeans.run(
3500-
nodes_with_avg_disk_write.map(item => [item.avg_disk_write]), {
3501-
k: config.NODE_ALLOCATOR_NUM_CLUSTERS
3502-
}
3503-
);
3504-
3505-
// Sort the groups by latency (centroid is the computed centralized latency for each group)
3506-
kmeans_clusters.sort(js_utils.sort_compare_by(item => item.centroid[0], 1));
3507-
3508-
kmeans_clusters.forEach(kmeans_cluster =>
3509-
latency_groups.push(kmeans_cluster.clusterInd.map(index => list[index]))
3510-
);
3511-
3512-
if (nodes_without_avg_disk_write.length) {
3513-
latency_groups[latency_groups.length - 1] =
3514-
_.concat(latency_groups[latency_groups.length - 1], nodes_without_avg_disk_write);
3515-
}
3516-
3517-
} else {
3518-
latency_groups.push(list);
3519-
}
3520-
3521-
const lg_res = latency_groups.map(cluster => {
3522-
const max = 1000;
3523-
// This is done in order to get the most unused or free drives
3524-
// Since we sclice the response up to 1000 drives
3525-
cluster.sort(js_utils.sort_compare_by(item => item.node.storage.used, 1));
3526-
const nodes_set = (cluster.length < max) ? cluster : cluster.slice(0, max);
3527-
return {
3528-
nodes: nodes_set.map(item => this._get_node_info(item, params.fields))
3529-
};
3530-
});
3325+
if (_.isEmpty(list)) return { latency_groups: [{ nodes: [] }] };
3326+
const max = 1000;
3327+
// This is done in order to get the most unused or free drives
3328+
// Since we sclice the response up to 1000 drives
3329+
list.sort(js_utils.sort_compare_by(item => item.node.storage.used, 1));
3330+
const nodes_set = (list.length < max) ? list : list.slice(0, max);
3331+
const latency_groups = [{
3332+
nodes: nodes_set.map(item => this._get_node_info(item, params.fields))
3333+
}];
35313334

35323335
return {
3533-
latency_groups: _.isEmpty(lg_res) ? [{ nodes: [] }] : lg_res
3336+
latency_groups
35343337
};
35353338
}
35363339

@@ -3666,15 +3469,6 @@ class NodesMonitor extends EventEmitter {
36663469
}
36673470
}
36683471

3669-
function scale_number_token(num) {
3670-
return 2 ** Math.round(Math.log2(num));
3671-
}
3672-
3673-
function scale_size_token(size) {
3674-
const scaled = Math.max(scale_number_token(size), size_utils.GIGABYTE);
3675-
return size_utils.human_size(scaled);
3676-
}
3677-
36783472
function progress_by_time(time, now) {
36793473
if (!time.end) return 0;
36803474
return Math.min(1, Math.max(0,

src/test/unit_tests/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ require('./test_agent_blocks_reclaimer');
8787
require('./test_s3_ops');
8888
require('./test_s3_encryption');
8989
require('./test_s3_bucket_policy');
90-
require('./test_node_allocator');
90+
// require('./test_node_allocator');
9191
require('./test_namespace_cache');
9292
require('./test_namespace_auth');
9393
require('./test_encryption');

0 commit comments

Comments
 (0)