Skip to content

Commit 627ef67

Browse files
authored
Merge pull request #8922 from liranmauda/liran-remove-pwhile
Refactor | Remove deprecated pwhile from the code in favor of async/await
2 parents 04d2ead + 70cd695 commit 627ef67

File tree

11 files changed

+410
-493
lines changed

11 files changed

+410
-493
lines changed

src/deploy/ec2_wrapper.js

Lines changed: 38 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ function verify_demo_system(ip) {
352352
});
353353
}
354354

355-
function put_object(ip, source, bucket, key, timeout, throw_on_error) {
355+
async function put_object(ip, source, bucket, key, timeout, throw_on_error) {
356356
load_demo_config_env(); //switch to Demo system
357357

358358
const rest_endpoint = 'http://' + ip + ':80';
@@ -374,50 +374,44 @@ function put_object(ip, source, bucket, key, timeout, throw_on_error) {
374374
};
375375
console.log('about to upload object', params);
376376
let start_ts = Date.now();
377-
return P.ninvoke(s3bucket, 'upload', params)
378-
.then(function(res) {
379-
console.log('Uploaded object took', (Date.now() - start_ts) / 1000, 'seconds, result', res);
380-
load_aws_config_env(); //back to EC2/S3
381377

382-
}, function(err) {
383-
const wait_limit_in_sec = timeout || 1200;
384-
const start_moment = moment();
385-
let wait_for_agents = (err.statusCode === 500 || err.statusCode === 403);
386-
console.log('failed to upload object in loop', err.statusCode, wait_for_agents);
387-
return P.pwhile(
388-
function() {
389-
return wait_for_agents;
390-
},
391-
function() {
392-
return P.fcall(function() {
393-
//switch to Demo system
394-
return load_demo_config_env();
395-
}).then(function() {
396-
params.Body = fs.createReadStream(source);
397-
start_ts = Date.now();
398-
return P.ninvoke(s3bucket, 'upload', params)
399-
.then(function(res) {
400-
console.log('Uploaded object took', (Date.now() - start_ts) / 1000, 'seconds, result', res);
401-
load_aws_config_env(); //back to EC2/S3
402-
wait_for_agents = false;
403-
404-
}, function(err2) {
405-
console.log('failed to upload. Will wait 10 seconds and retry. err', err2.statusCode);
406-
const curr_time = moment();
407-
if (curr_time.subtract(wait_limit_in_sec, 'second') > start_moment) {
408-
console.error('failed to upload. cannot wait any more', err2.statusCode);
409-
load_aws_config_env(); //back to EC2/S3
410-
wait_for_agents = false;
411-
if (throw_on_error) {
412-
throw new Error(err2);
413-
}
414-
} else {
415-
return P.delay(10000);
416-
}
417-
});
418-
});
419-
});
420-
});
378+
try {
379+
const res = await s3bucket.upload(params).promise();
380+
console.log('Uploaded object took', (Date.now() - start_ts) / 1000, 'seconds, result', res);
381+
load_aws_config_env(); //back to EC2/S3
382+
} catch (err) {
383+
console.log('failed to upload object in loop', err.statusCode);
384+
if (![500, 403].includes(err.statusCode)) throw err;
385+
386+
const wait_limit_in_sec = timeout || 1200;
387+
const start_moment = moment();
388+
389+
/* eslint-disable no-constant-condition */
390+
while (true) {
391+
const curr_time = moment();
392+
if (curr_time.subtract(wait_limit_in_sec, 'second') > start_moment) {
393+
console.error('failed to upload. cannot wait any more', err.statusCode);
394+
load_aws_config_env(); //back to EC2/S3
395+
if (throw_on_error) throw new Error(err);
396+
return;
397+
}
398+
399+
await P.delay(10000);
400+
401+
try {
402+
load_demo_config_env(); //switch to Demo system
403+
params.Body = fs.createReadStream(source);
404+
start_ts = Date.now();
405+
const res = await s3bucket.upload(params).promise();
406+
console.log('Uploaded object took', (Date.now() - start_ts) / 1000, 'seconds, result', res);
407+
load_aws_config_env(); //back to EC2/S3
408+
return res;
409+
} catch (err2) {
410+
console.log('failed to upload. Will wait 10 seconds and retry. err', err2.statusCode);
411+
}
412+
}
413+
}
414+
421415
}
422416

423417
function get_object(ip, obj_path) {

src/hosted_agents/hosted_agents.js

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ class HostedAgents {
7979
}
8080

8181

82-
_monitor_stats() {
83-
P.pwhile(() => true, () => {
82+
async _monitor_stats() {
83+
/* eslint-disable no-constant-condition */
84+
while (true) {
8485
const cpu_usage = process.cpuUsage(this.cpu_usage); //usage since last sample
8586
const mem_usage = process.memoryUsage();
8687
dbg.log0(`hosted_agent_stats_titles - process: cpu_usage_user, cpu_usage_sys, mem_usage_rss`);
@@ -94,12 +95,10 @@ class HostedAgents {
9495
}
9596
}
9697
this.cpu_usage = cpu_usage;
97-
return P.delay(60000);
98-
});
98+
await P.delay(60000);
99+
}
99100
}
100101

101-
102-
103102
async _start_pool_agent(pool) {
104103
if (!this._started) return;
105104
if (!pool) throw new Error(`Internal error: received pool ${pool}`);

src/test/pipeline/dataset.js

Lines changed: 92 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -725,117 +725,103 @@ async function upload_new_files() {
725725
console.timeEnd('dataset upload');
726726
}
727727

728-
function run_test(throw_on_fail) {
729-
return P.resolve()
730-
.then(() => log_journal_file(`${CFG_MARKER}${DATASET_NAME}-${JSON.stringify(TEST_CFG)}`))
731-
.then(() => upload_new_files()
732-
// aging
733-
.then(() => {
734-
if (argv.no_aging) {
735-
console.log('skipping aging stage');
736-
return;
737-
}
738-
TEST_STATE.aging = true;
739-
const start = Date.now();
740-
if (TEST_CFG.aging_timeout !== 0) {
741-
console.log(`will run aging for ${TEST_CFG.aging_timeout} minutes`);
742-
}
743-
return P.pwhile(() =>
744-
(TEST_CFG.aging_timeout === 0 || ((Date.now() - start) / (60 * 1000)) < TEST_CFG.aging_timeout), () => {
745-
console.log(`Aging... currently uploaded ${TEST_STATE.current_size} ${TEST_CFG.size_units} from desired ${
746-
TEST_CFG.dataset_size} ${TEST_CFG.size_units}`);
747-
let action_type;
748-
if (TEST_STATE.current_size > TEST_CFG.dataset_size) {
749-
console.log(`${Yellow}the current dataset size is ${
750-
TEST_STATE.current_size}${TEST_CFG.size_units} and the requested dataset size is ${
751-
TEST_CFG.dataset_size}${TEST_CFG.size_units}, going to delete${NC}`);
752-
action_type = Math.round(Math.random()) ? 'DELETE' : 'MULTI_DELETE';
753-
} else {
754-
action_type = 'RANDOM';
755-
}
756-
return P.resolve()
757-
.then(() => act_and_log(action_type));
758-
});
759-
})
760-
.then(async () => {
761-
await report.report();
762-
})
763-
.then(() => {
764-
console.log(`Everything finished with success!`);
765-
if (!TEST_CFG.no_exit_on_success) process.exit(0);
766-
})
767-
.catch(async err => {
768-
await report.report();
769-
console.error(`Errors during test`, err);
770-
if (throw_on_fail) {
771-
throw new Error(`dataset failed`);
728+
async function run_test(throw_on_fail) {
729+
try {
730+
await log_journal_file(`${CFG_MARKER}${DATASET_NAME}-${JSON.stringify(TEST_CFG)}`);
731+
await upload_new_files();
732+
733+
// aging
734+
if (argv.no_aging) {
735+
console.log('skipping aging stage');
736+
return;
737+
} else {
738+
TEST_STATE.aging = true;
739+
const start = Date.now();
740+
if (TEST_CFG.aging_timeout !== 0) {
741+
console.log(`will run aging for ${TEST_CFG.aging_timeout} minutes`);
742+
}
743+
while ((TEST_CFG.aging_timeout === 0 || ((Date.now() - start) / (60 * 1000)) < TEST_CFG.aging_timeout)) {
744+
console.log(`Aging... currently uploaded ${TEST_STATE.current_size} ${TEST_CFG.size_units} from desired ${
745+
TEST_CFG.dataset_size} ${TEST_CFG.size_units}`);
746+
let action_type;
747+
if (TEST_STATE.current_size > TEST_CFG.dataset_size) {
748+
console.log(`${Yellow}the current dataset size is ${
749+
TEST_STATE.current_size}${TEST_CFG.size_units} and the requested dataset size is ${
750+
TEST_CFG.dataset_size}${TEST_CFG.size_units}, going to delete${NC}`);
751+
action_type = Math.round(Math.random()) ? 'DELETE' : 'MULTI_DELETE';
772752
} else {
773-
process.exit(4);
753+
action_type = 'RANDOM';
774754
}
775-
}));
755+
await act_and_log(action_type);
756+
}
757+
}
758+
await report.report();
759+
760+
console.log(`Everything finished with success!`);
761+
if (!TEST_CFG.no_exit_on_success) process.exit(0);
762+
763+
} catch (err) {
764+
await report.report();
765+
console.error(`Errors during test`, err);
766+
if (throw_on_fail) {
767+
throw new Error(`dataset failed`);
768+
} else {
769+
process.exit(4);
770+
}
771+
}
776772
}
777773

778-
function run_replay() {
779-
const journal = [];
780-
const readfile = readline.createInterface({
781-
input: fs.createReadStream(argv.replay),
782-
terminal: false
783-
});
784-
return new Promise((resolve, reject) => {
785-
readfile
786-
.on('line', line => journal.push(line))
787-
.once('error', reject)
788-
.once('close', resolve);
789-
})
790-
.then(() => {
791-
//first line should contain the TEST_CFG
792-
console.log(`journal[0] ${journal[0]}`);
793-
if (!journal[0].startsWith(`${CFG_MARKER}`)) {
794-
console.error('Expected CFG as first line of replay');
795-
process.exit(5);
796-
}
797-
TEST_CFG = JSON.parse(journal[0].slice(CFG_MARKER.length + DATASET_NAME.length + 1));
798-
let iline = 1;
799-
let idx;
800-
let current_action;
801-
let current_params;
802-
return P.pwhile(
803-
() => iline < journal.length,
804-
() => P.resolve()
805-
.then(() => {
806-
//split action from params
807-
current_params = JSON.parse(journal[iline].slice(ACTION_MARKER.length));
808-
current_action = current_params.action;
809-
delete current_params.action;
810-
console.log(`Calling ${current_action} with parameters ${util.inspect(current_params)}`);
811-
//run single selected activity
812-
idx = _.findIndex(ACTION_TYPES, ac => ac.name === current_action);
813-
if (idx === -1) {
814-
console.error(`Cannot find action ${current_action}`);
815-
process.exit(1);
816-
} else {
817-
return ACTION_TYPES[idx].action(current_params);
818-
}
819-
820-
})
821-
.then(() => report.success(current_action))
822-
.catch(err => report.fail(current_action)
823-
.then(() => {
824-
console.error(`Failed replaying action ${current_action} with ${err}`);
825-
throw err;
826-
})
827-
)
828-
.finally(() => {
829-
iline += 1;
830-
})
831-
);
832-
})
833-
.then(async () => report.report())
834-
.catch(async err => {
835-
await report.report();
836-
console.error('Failed replaying journal file ', err);
837-
process.exit(5);
774+
async function run_replay() {
775+
try {
776+
const journal = [];
777+
const readfile = readline.createInterface({
778+
input: fs.createReadStream(argv.replay),
779+
terminal: false
838780
});
781+
782+
for await (const line of readfile) {
783+
journal.push(line);
784+
}
785+
786+
//first line should contain the TEST_CFG
787+
console.log(`journal[0] ${journal[0]}`);
788+
if (!journal[0].startsWith(`${CFG_MARKER}`)) {
789+
console.error('Expected CFG as first line of replay');
790+
process.exit(5);
791+
}
792+
793+
TEST_CFG = JSON.parse(journal[0].slice(CFG_MARKER.length + DATASET_NAME.length + 1));
794+
795+
for (let iline = 1; iline < journal.length; iline++) {
796+
let current_action;
797+
try {
798+
//split action from params
799+
const current_params = JSON.parse(journal[iline].slice(ACTION_MARKER.length));
800+
current_action = current_params.action;
801+
delete current_params.action;
802+
console.log(`Calling ${current_action} with parameters ${util.inspect(current_params)}`);
803+
//run single selected activity
804+
const idx = _.findIndex(ACTION_TYPES, ac => ac.name === current_action);
805+
if (idx === -1) {
806+
console.error(`Cannot find action ${current_action}`);
807+
process.exit(1);
808+
}
809+
810+
await ACTION_TYPES[idx].action(current_params);
811+
report.success(current_action);
812+
} catch (err) {
813+
console.error(`Failed action ${journal[iline]} with error ${err}`);
814+
report.fail(current_action);
815+
throw err;
816+
}
817+
}
818+
819+
await report.report();
820+
} catch (err) {
821+
await report.report();
822+
console.error('Failed replaying journal file ', err);
823+
process.exit(5);
824+
}
839825
}
840826

841827
async function main() {

0 commit comments

Comments
 (0)