diff --git a/.gitignore b/.gitignore index df3c9e53..d95139da 100644 --- a/.gitignore +++ b/.gitignore @@ -50,4 +50,6 @@ test/project_path.h hash_hex/ *.csv -toolkit/plugins \ No newline at end of file +toolkit/plugins + +*.sealed \ No newline at end of file diff --git a/README.md b/README.md index 87efaca8..e0245703 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Fidelius - 基于可信执行环境的熠智隐私计算中间件 +# Fidelius - YeeZ Privacy Computing English / [中文](doc/README_ZH.md) # Fidelius - YeeZ Privacy Computing diff --git a/test/integrate/test.sh b/test/integrate/test.sh new file mode 100644 index 00000000..90690f79 --- /dev/null +++ b/test/integrate/test.sh @@ -0,0 +1,11 @@ +for i in `seq 1 20`; do + rm -rf *.json + rm -rf *.sealed + rm -rf *.output + ../../bin/test_scheduler + echo "Test $i" + echo "Test $i" + echo "Test $i" + echo "Test $i" + echo "Test $i" +done diff --git a/test/toolkit/scheduler/common.hpp b/test/toolkit/scheduler/common.hpp index 637a6cc1..8a2f52b4 100755 --- a/test/toolkit/scheduler/common.hpp +++ b/test/toolkit/scheduler/common.hpp @@ -55,7 +55,8 @@ namespace cluster { static nlohmann::json fid_terminus(nlohmann::json kwargs) { - spdlog::trace("fid_terminus"); + spdlog::trace("fid_terminus starts"); + spdlog::trace(kwargs.dump()); nlohmann::json ret; @@ -73,10 +74,13 @@ namespace cluster { } std::string output = execute_cmd(cmd); + spdlog::trace(output); ret["cmd"] = cmd; ret["output"] = output; + spdlog::trace("fid_terminus ends"); + return ret; } @@ -257,6 +261,7 @@ namespace cluster { } } std::string output = execute_cmd(cmd); + spdlog::trace(output); ret["cmd"] = cmd; ret["output"] = output; diff --git a/test/toolkit/scheduler/job_step.hpp b/test/toolkit/scheduler/job_step.hpp index 7eb3055d..ac83b3bd 100755 --- a/test/toolkit/scheduler/job_step.hpp +++ b/test/toolkit/scheduler/job_step.hpp @@ -10,11 +10,17 @@ #include #include +#include #include -namespace cluster { - class JobStep { +namespace cluster +{ + class JobStep + { + public: + static std::mutex mutex; + public: static void remove_files(std::vector file_list) { @@ -25,39 +31,48 @@ namespace cluster { std::string cmd = std::string{"rm -rf "} + iter; Common::execute_cmd(cmd); } - } static nlohmann::json gen_key(std::string crypto, std::string shukey_file) { - spdlog::trace("gen_key"); + spdlog::trace("gen_key starts"); nlohmann::json param; param["crypto"] = crypto; param["gen-key"] = ""; param["no-password"] = ""; param["output"] = shukey_file; -// nlohmann::json param = nlohmann::json::parse(R"( -// { -// "crypto": crypto, -// "gen-key": "", -// "no-password": "", -// "output": shukey_file -// } -// )"); + // nlohmann::json param = nlohmann::json::parse(R"( + // { + // "crypto": crypto, + // "gen-key": "", + // "no-password": "", + // "output": shukey_file + // } + // )"); + // JobStep::mutex.lock(); Common::fid_terminus(param); - std::ifstream f(shukey_file); - nlohmann::json data = nlohmann::json::parse(f); + // JobStep::mutex.unlock(); + + // JobStep::mutex.lock(); + spdlog::trace("shukey_file={}", shukey_file); + std::ifstream ifs(shukey_file); + nlohmann::json data = nlohmann::json::parse(ifs); + ifs.close(); + // JobStep::mutex.unlock(); + + spdlog::trace("gen_key ends"); + return data; } static nlohmann::json seal_data( - std::string crypto, - std::string data_url, - std::string plugin_url, - std::string sealed_data_url, - std::string sealed_output, - std::string data_key_file) + std::string crypto, + std::string data_url, + std::string plugin_url, + std::string sealed_data_url, + std::string sealed_output, + std::string data_key_file) { spdlog::trace("seal_data"); @@ -74,37 +89,44 @@ namespace cluster { static std::string read_sealed_output(std::string filepath, std::string field) { - spdlog::trace("read_sealed_output"); - std::ifstream ifs(filepath); + if (!ifs) + { + throw std::runtime_error("fail to open file stream"); + } std::string line; while (std::getline(ifs, line)) { - std::istringstream iss(line); - std::string iss_data; - std::getline(iss, iss_data, '='); + std::vector line_split; + boost::split(line_split, line, boost::is_any_of("=")); std::string key, value; - iss >> key >> value; + key = line_split[0]; + value = line_split[1]; boost::trim(key); boost::trim(value); + if (value.front() == '"') + { + value.erase(0, 1); // erase the first character + value.erase(value.size() - 1); // erase the last character + } if (key == field) { return value; } } + ifs.close(); // FIXME: should report error here return std::string{""}; } static nlohmann::json forward_message( - std::string crypto, - std::string shukey_file, - std::string dian_pkey, - std::string enclave_hash, - std::string forward_result - ) + std::string crypto, + std::string shukey_file, + std::string dian_pkey, + std::string enclave_hash, + std::string forward_result) { spdlog::trace("forward_message starts"); @@ -119,11 +141,15 @@ namespace cluster { { param["use-enclave-hash"] = enclave_hash; } + + // JobStep::mutex.lock(); Common::fid_terminus(param); + // JobStep::mutex.unlock(); spdlog::trace("forward_message: get forward result"); std::ifstream ifs(forward_result); nlohmann::json output = nlohmann::json::parse(ifs); + ifs.close(); spdlog::trace("forward_message ends"); @@ -159,28 +185,32 @@ namespace cluster { return ret; } - static std::string read_parser_hash(std::string parser_url) + static std::string read_parser_hash(std::string name, std::string parser_url) { spdlog::trace("read_parser_hash"); nlohmann::json param; param["enclave"] = parser_url; - param["output"] = "info.json"; + std::string name_url = name + "-info.json"; + param["output"] = name_url; + // JobStep::mutex.lock(); nlohmann::json r = Common::fid_dump(param); + // JobStep::mutex.unlock(); - std::ifstream ifs("info.json"); + std::ifstream ifs(name_url); nlohmann::json data = nlohmann::json::parse(ifs); + ifs.close(); return data["enclave-hash"]; } static nlohmann::json generate_request( - std::string crypto, - std::string input_param, - std::string shukey_file, - std::string param_output_url, - nlohmann::json config) + std::string crypto, + std::string input_param, + std::string shukey_file, + std::string param_output_url, + nlohmann::json config) { spdlog::trace("generate_request starts"); @@ -195,11 +225,15 @@ namespace cluster { std::string r; if (config.contains("request-use-js") && config["request-use-js"] != "") { + // JobStep::mutex.lock(); nlohmann::json r = CommonJs::fid_terminus(param); + // JobStep::mutex.unlock(); } else { + // JobStep::mutex.lock(); nlohmann::json r = Common::fid_terminus(param); + // JobStep::mutex.unlock(); } std::string abs_param_output_url = Common::current_dir / std::filesystem::path(param_output_url); @@ -211,6 +245,7 @@ namespace cluster { } nlohmann::json ret = nlohmann::json::parse(ifs); + ifs.close(); spdlog::trace("generate_request ends"); @@ -218,21 +253,21 @@ namespace cluster { } static nlohmann::json fid_analyzer_tg( - nlohmann::json shukey_json, - nlohmann::json rq_forward_json, - nlohmann::json algo_shu_info, - nlohmann::json algo_forward_json, - std::string enclave_hash, - std::vector input_data, - std::string parser_url, - std::string dian_pkey, - nlohmann::json model, - std::string crypto, - nlohmann::json param_json, - std::vector flat_kgt_pkey_list, - std::vector allowances, - std::string parser_input_file, - std::string parser_output_file) + nlohmann::json shukey_json, + nlohmann::json rq_forward_json, + nlohmann::json algo_shu_info, + nlohmann::json algo_forward_json, + std::string enclave_hash, + std::vector input_data, + std::string parser_url, + std::string dian_pkey, + nlohmann::json model, + std::string crypto, + nlohmann::json param_json, + std::vector flat_kgt_pkey_list, + std::vector allowances, + std::string parser_input_file, + std::string parser_output_file) { spdlog::trace("fid_analyzer_tg starts"); @@ -268,12 +303,15 @@ namespace cluster { param["output"] = parser_output_file; nlohmann::json r = Common::fid_analyzer(param); - try { + try + { std::ifstream ifs(parser_output_file); spdlog::trace("fid_analyzer_tg ends"); - return nlohmann::json::parse(ifs); + nlohmann::json ret = nlohmann::json::parse(ifs); + ifs.close(); + return ret; } - catch (const std::exception& e) + catch (const std::exception &e) { // do nothing spdlog::error(e.what()); @@ -283,4 +321,4 @@ namespace cluster { }; } -#endif //YPC_JOB_STEP_HPP +#endif // YPC_JOB_STEP_HPP diff --git a/test/toolkit/scheduler/test_scheduler.cpp b/test/toolkit/scheduler/test_scheduler.cpp index b960cbdb..8dc20cb7 100755 --- a/test/toolkit/scheduler/test_scheduler.cpp +++ b/test/toolkit/scheduler/test_scheduler.cpp @@ -6,20 +6,22 @@ using namespace cluster; +std::mutex JobStep::mutex; + nlohmann::json decrypt_result( - std::string crypto, - std::string encrypted_result, - std::string kgt_pkey, - std::string key_json_list, - std::string output) + std::string crypto, + std::string encrypted_result, + std::string kgt_pkey, + std::string key_json_list, + std::string output) { - std::string cmd = Common::bin_dir / std::filesystem::path("./result_decrypt") ; + std::string cmd = Common::bin_dir / std::filesystem::path("./result_decrypt"); cmd = cmd + - " --crypto " + crypto + - " --encrypted-result " + encrypted_result + - " --kgt-pkey " + kgt_pkey + - " --key-json-file " + key_json_list + - " --output " + output; + " --crypto " + crypto + + " --encrypted-result " + encrypted_result + + " --kgt-pkey " + kgt_pkey + + " --key-json-file " + key_json_list + + " --output " + output; std::string cmd_output = Common::execute_cmd(cmd); nlohmann::json ret; @@ -29,28 +31,28 @@ nlohmann::json decrypt_result( } TaskGraph_Job::TaskGraph_Job( - std::string crypto, - nlohmann::json all_tasks, - std::vector all_output, - nlohmann::json config, - std::vector key_files) : - crypto(crypto), - all_tasks(all_tasks), - all_outputs(all_output), - config(config), - key_files(key_files) { - + std::string crypto, + nlohmann::json all_tasks, + std::vector all_output, + nlohmann::json config, + std::vector key_files) : crypto(crypto), + all_tasks(all_tasks), + all_outputs(all_output), + config(config), + key_files(key_files) +{ } nlohmann::json TaskGraph_Job::handle_input_data( - nlohmann::json summary, - std::string data_url, - std::string plugin_url, - std::string dian_pkey, - std::string enclave_hash, - uint64_t idx, - std::vector tasks, - std::vector prev_tasks_idx) { + nlohmann::json summary, + std::string data_url, + std::string plugin_url, + std::string dian_pkey, + std::string enclave_hash, + uint64_t idx, + std::vector tasks, + std::vector prev_tasks_idx) +{ spdlog::trace("handle_input_data starts"); @@ -59,15 +61,18 @@ nlohmann::json TaskGraph_Job::handle_input_data( // 1.1 generate data key spdlog::trace("handle_input_data: 1.1 generate data key"); std::string data_key_file = - data_url + - ".data" + - std::to_string(idx) + - ".key.json"; + data_url + + ".data" + + std::to_string(idx) + + ".key.json"; nlohmann::json data_shukey_json = JobStep::gen_key(crypto, data_key_file); + // JobStep::mutex.lock(); key_files.push_back(data_key_file); + // JobStep::mutex.unlock(); // 2. call data provider to seal data spdlog::trace("handle_input_data: 2. call data provider to seal data"); + spdlog::trace("data_url: {}", data_url); std::string sealed_data_url = data_url + ".sealed"; std::string sealed_output = data_url + ".sealed.output"; summary["data-url"] = data_url; @@ -77,34 +82,46 @@ nlohmann::json TaskGraph_Job::handle_input_data( if (prev_tasks_idx.empty()) { + spdlog::trace("prev_tasks_idx empty"); + // JobStep::mutex.lock(); nlohmann::json r = JobStep::seal_data( - crypto, - data_url, - plugin_url, - sealed_data_url, - sealed_output, - data_key_file); + crypto, + data_url, + plugin_url, + sealed_data_url, + sealed_output, + data_key_file); + // JobStep::mutex.unlock(); } else { + spdlog::trace("prev_tasks_idx not empty"); nlohmann::json task = tasks[idx]; std::string name = task["name"]; std::string parser_output_file = name + "_parser_output.json"; + spdlog::trace(parser_output_file); std::ifstream ifs_pof(parser_output_file); + spdlog::trace("fstream opened"); nlohmann::json output_json = nlohmann::json::parse(ifs_pof); - std::cout << output_json << std::endl; - // exit(0); + spdlog::trace("json parsed"); + ifs_pof.close(); + std::cout << "output_json" << output_json << std::endl; + // JobStep::mutex.lock(); nlohmann::json r = intermediate_seal_data( - output_json["encrypted_result"], - sealed_data_url); + output_json["encrypted_result"], + sealed_data_url); + // JobStep::mutex.unlock(); std::ofstream ofs_so(sealed_output); ofs_so << "data_id = " << output_json["intermediate_data_hash"] << std::endl; ofs_so << "pkey_kgt = " << output_json["data_kgt_pkey"] << std::endl; + ofs_so.close(); } std::string data_hash = JobStep::read_sealed_output(sealed_output, "data_id"); + spdlog::trace("sealed_output: {}", sealed_output); + spdlog::trace("data_hash: {}", data_hash); std::string flat_kgt_pkey = JobStep::read_sealed_output(sealed_output, "pkey_kgt"); summary["data-hash"] = data_hash; // print("done seal data with hash: {}, cmd: {}".format(data_hash, r[0])) @@ -113,17 +130,26 @@ nlohmann::json TaskGraph_Job::handle_input_data( spdlog::trace("handle_input_data: render data_forward_json_list"); std::vector data_forward_json_list; - for (auto key_file : key_files) + // for (auto key_file : key_files) + for (size_t i = 0; i < key_files.size(); i++) { + // JobStep::mutex.lock(); + auto key_file = key_files[i]; + // JobStep::mutex.unlock(); + std::ifstream ifs_kf(key_file); nlohmann::json shukey_json = nlohmann::json::parse(ifs_kf); - std::string forward_result = key_file + ".shukey.foward.json"; + ifs_kf.close(); + std::string forward_result = key_file + "." + enclave_hash + ".shukey.foward.json"; + // JobStep::mutex.lock(); nlohmann::json d = JobStep::forward_message( - crypto, - key_file, - dian_pkey, - enclave_hash, - forward_result); + crypto, + key_file, + dian_pkey, + enclave_hash, + forward_result); + // JobStep::mutex.unlock(); + spdlog::trace("forward_message"); nlohmann::json forward_json; forward_json["shu_pkey"] = shukey_json["public-key"]; @@ -133,13 +159,13 @@ nlohmann::json TaskGraph_Job::handle_input_data( all_outputs.push_back(forward_result); data_forward_json_list.push_back(forward_json); - } spdlog::trace("handle_input_data: render data_obj"); nlohmann::json data_obj; data_obj["input_data_url"] = sealed_data_url; data_obj["input_data_hash"] = data_hash; + spdlog::trace("data_hash: {}", data_hash); data_obj["kgt_shu_info"]["kgt_pkey"] = flat_kgt_pkey; data_obj["kgt_shu_info"]["data_shu_infos"] = data_forward_json_list; data_obj["tag"] = "0"; @@ -147,7 +173,6 @@ nlohmann::json TaskGraph_Job::handle_input_data( ret["data_obj"] = data_obj; ret["flat_kgt_pkey"] = flat_kgt_pkey; std::cout << ret << std::endl; - // exit(0); spdlog::trace("handle_input_data ends"); @@ -155,14 +180,19 @@ nlohmann::json TaskGraph_Job::handle_input_data( } nlohmann::json TaskGraph_Job::run( - std::vector tasks, - uint64_t idx, - std::vector prev_tasks_idx) { + std::vector tasks, + uint64_t idx, + std::vector prev_tasks_idx, + nlohmann::json key) +{ + spdlog::trace("run task {} starts", idx); + nlohmann::json ret; nlohmann::json task = tasks[idx]; std::string name = task["name"]; nlohmann::json data_urls = task["data"]; + spdlog::trace("data_urls dump: {}", data_urls.dump()); nlohmann::json plugin_urls = task["reader"]; std::string parser_url = task["parser"]; std::string input_param = task["param"]; @@ -183,33 +213,39 @@ nlohmann::json TaskGraph_Job::run( // get dian pkey spdlog::trace("get dian pkey"); - nlohmann::json key = JobStep::get_first_key(crypto); + // nlohmann::json key = JobStep::get_first_key(crypto); std::string pkey = key["public-key"]; nlohmann::json summary; summary["tee-pkey"] = key["public-key"]; // read parser enclave hash spdlog::trace("read parser enclave hash"); - std::string enclave_hash = JobStep::read_parser_hash(parser_url); + std::string enclave_hash = JobStep::read_parser_hash(name, parser_url); // 3. call terminus to generate forward message // 3.2 forward algo shu skey spdlog::trace("3.2 forward algo shu skey"); std::string algo_forward_result = - name + - ".algo" + - std::to_string(idx) + - ".shukey.foward.json"; + name + + ".algo" + + std::to_string(idx) + + ".shukey.foward.json"; + // JobStep::mutex.lock(); + spdlog::trace("forward_message algo {}", idx); nlohmann::json algo_forward_json = JobStep::forward_message( - crypto, algo_key_file, pkey, enclave_hash, algo_forward_result); + crypto, algo_key_file, pkey, enclave_hash, algo_forward_result); + // JobStep::mutex.unlock(); all_outputs.push_back(algo_forward_result); + // JobStep::mutex.lock(); // 3.3 forward user shu skey spdlog::trace("3.3 forward user shu skey"); std::string user_forward_result = name + ".user" + std::to_string(idx) + ".shukey.foward.json"; nlohmann::json user_forward_json = JobStep::forward_message( - crypto, user_key_file, pkey, enclave_hash, user_forward_result); + crypto, user_key_file, pkey, enclave_hash, user_forward_result); all_outputs.push_back(user_forward_result); + // JobStep::mutex.unlock(); + // JobStep::mutex.lock(); // handle all data spdlog::trace("handle all data"); if (!prev_tasks_idx.empty()) @@ -221,23 +257,27 @@ nlohmann::json TaskGraph_Job::run( for (size_t i = 0; i < data_urls.size(); i++) { auto iter = data_urls[i]; + spdlog::trace("data_url[0]: {}", data_urls[0]); + spdlog::trace("data_url: {}", data_urls[i]); nlohmann::json result = handle_input_data( - summary, - data_urls[i], - plugin_urls[i], - pkey, enclave_hash, - i, - tasks, - prev_tasks_idx); + summary, + data_urls[i], + plugin_urls[i], + pkey, + enclave_hash, + i, + tasks, + prev_tasks_idx); input_data.push_back(result["data_obj"]); flat_kgt_pkey_list.push_back(result["flat_kgt_pkey"]); } + // JobStep::mutex.unlock(); // 4. call terminus to generate request spdlog::trace("4. call terminus to generate request"); std::string param_output_url = name + "_param" + std::to_string(idx) + ".json"; nlohmann::json param_json = JobStep::generate_request( - crypto, input_param, user_key_file, param_output_url, config); + crypto, input_param, user_key_file, param_output_url, config); summary["analyzer-output"] = param_json["encrypted-input"]; all_outputs.push_back(param_output_url); @@ -246,21 +286,21 @@ nlohmann::json TaskGraph_Job::run( std::string parser_input_file = name + "_parser_input.json"; std::string parser_output_file = name + "_parser_output.json"; nlohmann::json result_json = JobStep::fid_analyzer_tg( - user_shukey_json, - user_forward_json, - algo_shukey_json, - algo_forward_json, - enclave_hash, - input_data, - parser_url, - pkey, - nlohmann::json(), - crypto, - param_json, - flat_kgt_pkey_list, - std::vector(), - parser_input_file, - parser_output_file); + user_shukey_json, + user_forward_json, + algo_shukey_json, + algo_forward_json, + enclave_hash, + input_data, + parser_url, + pkey, + nlohmann::json(), + crypto, + param_json, + flat_kgt_pkey_list, + std::vector(), + parser_input_file, + parser_output_file); summary["encrypted-result"] = result_json["encrypted_result"]; summary["result-signature"] = result_json["result_signature"]; @@ -269,13 +309,16 @@ nlohmann::json TaskGraph_Job::run( ofs_sf << summary.dump(); ofs_sf.close(); all_outputs.push_back(summary_file); - JobStep::remove_files(all_outputs); + + // FIXME: disable remove files as tasks share one all outputs list + // JobStep::remove_files(all_outputs); std::vector key_json_list; for (auto key_file : key_files) { std::ifstream ifs(key_file); key_json_list.push_back(nlohmann::json::parse(ifs)); + ifs.close(); } std::string all_keys_file = name + ".all-keys.json"; std::ofstream ofs_akf(all_keys_file); @@ -289,6 +332,8 @@ nlohmann::json TaskGraph_Job::run( ret["data_kgt_pkey"] = result_json["data_kgt_pkey"]; ret["all_keys_file"] = all_keys_file; + spdlog::trace("run task {} ends", idx); + return ret; } @@ -303,123 +348,156 @@ void gen_kgt() Common::execute_cmd(cmd); } -int main(const int argc, const char *argv[]) { +int main(const int argc, const char *argv[]) +{ jobStep = std::make_unique(); common = std::make_unique(); commonJs = std::make_unique(); spdlog::set_level(spdlog::level::trace); - std::cout << std::filesystem::current_path() << std::endl; - -// std::string crypto = "stdeth"; - -// spdlog::trace("build all_tasks"); -// std::vector all_tasks; - -// nlohmann::json task1; -// task1["name"] = "org_info"; -// task1["data"] = nlohmann::json::array({"corp.csv"}); -// std::string task1_reader = Common::lib_dir / std::filesystem::path("libt_org_info_reader.so"); -// std::string task1_parser = Common::lib_dir / std::filesystem::path("t_org_info_parser.signed.so"); -// std::string task1_param = R"([{"type":"string","value":"91110114787775909K"}])"; -// task1["reader"] = nlohmann::json::array({task1_reader}); -// task1["parser"] = task1_parser; -// task1["param"] = task1_param; -// all_tasks.push_back(task1); - -// nlohmann::json task2; -// task2["name"] = "tax"; -// task2["data"] = nlohmann::json::array({"tax.csv"}); -// std::string task2_reader = Common::lib_dir / std::filesystem::path("libt_tax_reader.so"); -// std::string task2_parser = Common::lib_dir / std::filesystem::path("t_tax_parser.signed.so"); -// std::string task2_param = R"([{"type":"string","value":"91110114787775909K"}])"; -// task2["reader"] = nlohmann::json::array({task2_reader}); -// task2["parser"] = task2_parser; -// task2["param"] = task2_param; -// all_tasks.push_back(task2); - -// nlohmann::json task3; -// task3["name"] = "merge"; -// task3["data"] = nlohmann::json::array({"result_org_info.csv", "result_tax.csv"}); -// std::string task3_reader1 = Common::lib_dir / std::filesystem::path("libt_org_info_reader.so"); -// std::string task3_reader2 = Common::lib_dir / std::filesystem::path("libt_tax_reader.so"); -// std::string task3_parser = Common::lib_dir / std::filesystem::path("t_org_tax_parser.signed.so"); -// // std::string task3_param = "\"[{\\\"type\\\":\\\"string\\\",\\\"value\\\":\\\"91110114787775909K\\\"}]\""; -// std::string task3_param = R"([{"type":"string","value":"91110114787775909K"}])"; - -// task3["reader"] = nlohmann::json::array({task3_reader1, task3_reader2}); -// task3["parser"] = task3_parser; -// task3["param"] = task3_param; -// all_tasks.push_back(task3); - -// nlohmann::json all_tasks = nlohmann::json::parse(R"( -// { -// "task1": { -// "name": "org_info", -// "data": ["corp.csv"], -// "reader": [os.path.join(common.lib_dir, "libt_org_info_reader.so")], -// "parser": os.path.join(common.lib_dir, "t_org_info_parser.signed.so"), -// "param": "\"[{\\\"type\\\":\\\"string\\\",\\\"value\\\":\\\"91110114787775909K\\\"}]\"", -// }, -// "task2": { -// "name": "tax", -// "data": ["tax.csv"], -// "reader": [os.path.join(common.lib_dir, "libt_tax_reader.so")], -// "parser": os.path.join(common.lib_dir, "t_tax_parser.signed.so"), -// "param": "\"[{\\\"type\\\":\\\"string\\\",\\\"value\\\":\\\"91110114787775909K\\\"}]\"", -// }, -// "task3": { -// "name": "merge", -// "data": ["result_org_info.csv", "result_tax.csv"], -// "reader": [os.path.join(common.lib_dir, "libt_org_info_reader.so"), os.path.join(common.lib_dir, "libt_tax_reader.so")], -// "parser": os.path.join(common.lib_dir, "t_org_tax_parser.signed.so"), -// "param": "\"[{\\\"type\\\":\\\"string\\\",\\\"value\\\":\\\"91110114787775909K\\\"}]\"", -// } -// } -// )"); - -// spdlog::trace("build config"); -// nlohmann::json config; -// config["request-use-js"] = "true"; -// config["remove-files"] = "true"; - -// spdlog::trace("build taskgraph job"); -// TaskGraph_Job tj( -// crypto, -// all_tasks, -// std::vector(), -// config, -// std::vector()); -// spdlog::trace("run job0"); -// tj.run(all_tasks, 0, std::vector()); -// spdlog::trace("run job1"); -// tj.run(all_tasks, 1, std::vector()); -// spdlog::trace("run job2"); -// nlohmann::json result = tj.run(all_tasks, 2, std::vector{0, 1}); -// std::string result_file = "taskgraph.result.output"; -// -// std::string enc_res = result["encrypted_result"]; -// std::string kgt_pkey = result["data_kgt_pkey"]; -// std::string all_keys_file = result["all_keys_file"]; -// nlohmann::json dec_res = -// decrypt_result(crypto, enc_res, kgt_pkey, all_keys_file, result_file); -// spdlog::info(dec_res["output"]); + std::cout << std::filesystem::current_path() << std::endl; + + std::string crypto = "stdeth"; + + spdlog::trace("build all_tasks"); + std::vector all_tasks; + + nlohmann::json task1; + task1["name"] = "org_info"; + task1["data"] = nlohmann::json::array({"corp.csv"}); + std::string task1_reader = Common::lib_dir / std::filesystem::path("libt_org_info_reader.so"); + std::string task1_parser = Common::lib_dir / std::filesystem::path("t_org_info_parser.signed.so"); + std::string task1_param = R"([{"type":"string","value":"91110114787775909K"}])"; + task1["reader"] = nlohmann::json::array({task1_reader}); + task1["parser"] = task1_parser; + task1["param"] = task1_param; + all_tasks.push_back(task1); + + nlohmann::json task2; + task2["name"] = "tax"; + task2["data"] = nlohmann::json::array({"tax.csv"}); + std::string task2_reader = Common::lib_dir / std::filesystem::path("libt_tax_reader.so"); + std::string task2_parser = Common::lib_dir / std::filesystem::path("t_tax_parser.signed.so"); + std::string task2_param = R"([{"type":"string","value":"91110114787775909K"}])"; + task2["reader"] = nlohmann::json::array({task2_reader}); + task2["parser"] = task2_parser; + task2["param"] = task2_param; + all_tasks.push_back(task2); + + nlohmann::json task3; + task3["name"] = "merge"; + task3["data"] = nlohmann::json::array({"result_org_info.csv", "result_tax.csv"}); + std::string task3_reader1 = Common::lib_dir / std::filesystem::path("libt_org_info_reader.so"); + std::string task3_reader2 = Common::lib_dir / std::filesystem::path("libt_tax_reader.so"); + std::string task3_parser = Common::lib_dir / std::filesystem::path("t_org_tax_parser.signed.so"); + // std::string task3_param = "\"[{\\\"type\\\":\\\"string\\\",\\\"value\\\":\\\"91110114787775909K\\\"}]\""; + std::string task3_param = R"([{"type":"string","value":"91110114787775909K"}])"; + + task3["reader"] = nlohmann::json::array({task3_reader1, task3_reader2}); + task3["parser"] = task3_parser; + task3["param"] = task3_param; + all_tasks.push_back(task3); + + // nlohmann::json all_tasks = nlohmann::json::parse(R"( + // { + // "task1": { + // "name": "org_info", + // "data": ["corp.csv"], + // "reader": [os.path.join(common.lib_dir, "libt_org_info_reader.so")], + // "parser": os.path.join(common.lib_dir, "t_org_info_parser.signed.so"), + // "param": "\"[{\\\"type\\\":\\\"string\\\",\\\"value\\\":\\\"91110114787775909K\\\"}]\"", + // }, + // "task2": { + // "name": "tax", + // "data": ["tax.csv"], + // "reader": [os.path.join(common.lib_dir, "libt_tax_reader.so")], + // "parser": os.path.join(common.lib_dir, "t_tax_parser.signed.so"), + // "param": "\"[{\\\"type\\\":\\\"string\\\",\\\"value\\\":\\\"91110114787775909K\\\"}]\"", + // }, + // "task3": { + // "name": "merge", + // "data": ["result_org_info.csv", "result_tax.csv"], + // "reader": [os.path.join(common.lib_dir, "libt_org_info_reader.so"), os.path.join(common.lib_dir, "libt_tax_reader.so")], + // "parser": os.path.join(common.lib_dir, "t_org_tax_parser.signed.so"), + // "param": "\"[{\\\"type\\\":\\\"string\\\",\\\"value\\\":\\\"91110114787775909K\\\"}]\"", + // } + // } + // )"); + + spdlog::trace("build config"); + nlohmann::json config; + config["request-use-js"] = "true"; + config["remove-files"] = "false"; + + spdlog::trace("build taskgraph job"); + TaskGraph_Job tj( + crypto, + all_tasks, + std::vector(), + config, + std::vector()); + + nlohmann::json result; + + nlohmann::json key_0 = JobStep::get_first_key(crypto); + nlohmann::json key_1 = JobStep::get_first_key(crypto); + nlohmann::json key_2 = JobStep::get_first_key(crypto); + + // create a thread with code to execute + // std::thread t1([&] () { + // spdlog::trace("run job0"); + // tj.run(all_tasks, 0, std::vector(), key_0); + // }); + // std::thread t2([&] () { + // spdlog::trace("run job1"); + // tj.run(all_tasks, 1, std::vector(), key_1); + // }); + // t1.join(); + // t2.join(); tf::Executor executor; tf::Taskflow taskflow; - auto [task_0, task_1, task_2] = taskflow.emplace( // create four tasks - [&] () { std::cout << "Task0 starts \n"; Common::execute_cmd("python3 taskgraph_job_0.py 0"); std::cout << "Task0 ends \n";}, - [&] () { std::cout << "Task1 starts \n"; Common::execute_cmd("python3 taskgraph_job_1.py 1"); std::cout << "Task1 ends \n";}, - [&] () { std::cout << "Task2 starts \n"; Common::execute_cmd("python3 taskgraph_job_2.py 2"); std::cout << "Task2 ends \n";} - ); -// task_1.succeed(task_0); - task_2.succeed(task_0, task_1); - executor.run(taskflow).wait(); - - // Common::execute_cmd("python3 taskgraph_job_0.py 0"); - // Common::execute_cmd("python3 taskgraph_job_1.py 1"); - // Common::execute_cmd("python3 taskgraph_job_2.py 2"); - - return 0; + auto [task_0, task_1, task_2] = taskflow.emplace( // create tasks + [&] () { + spdlog::trace("run job0"); + tj.run(all_tasks, 0, std::vector(), key_0); + }, + [&] () { + spdlog::trace("run job1"); + tj.run(all_tasks, 1, std::vector(), key_1); + }, + [&] () { + spdlog::trace("run job2"); + result = tj.run(all_tasks, 2, std::vector{0, 1}, key_2); + std::cout << result << std::endl; + } + ); + // task_0.succeed(task_1); + task_2.succeed(task_0, task_1); + executor.run(taskflow).wait(); + + // Common::execute_cmd("python3 taskgraph_job_0.py 0"); + // Common::execute_cmd("python3 taskgraph_job_1.py 1"); + // Common::execute_cmd("python3 taskgraph_job_2.py 2"); + + std::string result_file = "taskgraph.result.output"; + + std::string enc_res = result["encrypted_result"]; + std::string kgt_pkey = result["data_kgt_pkey"]; + std::string all_keys_file = result["all_keys_file"]; + nlohmann::json dec_res = + decrypt_result(crypto, enc_res, kgt_pkey, all_keys_file, result_file); + + std::string line; + std::ifstream ifs; + + ifs.open(result_file); + while (ifs >> line) + { + std::cout << line << std::endl; + } + ifs.close(); + + return 0; } diff --git a/test/toolkit/scheduler/test_scheduler.hpp b/test/toolkit/scheduler/test_scheduler.hpp index 3df7330f..842285ef 100755 --- a/test/toolkit/scheduler/test_scheduler.hpp +++ b/test/toolkit/scheduler/test_scheduler.hpp @@ -43,7 +43,8 @@ namespace cluster { nlohmann::json run( std::vector tasks, uint64_t idx, - std::vector prev_tasks_idx); + std::vector prev_tasks_idx, + nlohmann::json key); public: std::string crypto; diff --git a/toolkit/analyzer/main.cpp b/toolkit/analyzer/main.cpp index 8cfbed11..045f997c 100644 --- a/toolkit/analyzer/main.cpp +++ b/toolkit/analyzer/main.cpp @@ -28,14 +28,16 @@ int main(int argc, char *argv[]) { vm["input"].as()); g_parser = std::make_shared(input_param); - std::cout << "start to parse" << std::endl; + std::cout << "parse starts" << std::endl; g_parser->parse(); + std::cout << "parse ends" << std::endl; std::string output_fp = vm["output"].as(); try { std::ofstream os(output_fp, std::ios::out | std::ios::binary); const std::string &res = g_parser->get_result_str(); os.write(res.data(), res.size()); + os.close(); } catch (const std::exception &e) { std::cerr << "cannot open " << output_fp << std::endl; return 1; diff --git a/toolkit/analyzer/parsers/tg_parser.cpp b/toolkit/analyzer/parsers/tg_parser.cpp index 1ef0fe42..cc0d200f 100644 --- a/toolkit/analyzer/parsers/tg_parser.cpp +++ b/toolkit/analyzer/parsers/tg_parser.cpp @@ -96,6 +96,7 @@ uint32_t parser::parse() { << ypc::status_string(ret); return ret; } + auto param_var = m_param.get(); typename ypc::cast_obj_to_package::type param_pkg = param_var; auto param_bytes = ypc::make_bytes::for_package(param_pkg); @@ -122,6 +123,7 @@ uint32_t parser::parse() { LOG(ERROR) << "get_analyze_result got error " << ypc::status_string(ret); return ret; } + ret = dump_result(res); if (ret != 0u) { LOG(ERROR) << "dump_result got error " << ypc::status_string(ret); @@ -170,6 +172,7 @@ uint32_t parser::feed_datasource() { LOG(WARNING) << "only need 1 input, ignore other inputs"; } } + if (m_ptype.d.data_source_type == ypc::utc::multi_sealed_datasource_parser) { if (input_data_var.empty()) { LOG(ERROR) << "missing input, require at least one input data source"; @@ -181,6 +184,7 @@ uint32_t parser::feed_datasource() { std::vector all_data_info; for (auto item : input_data_var) { auto url = item.get(); + LOG(INFO) << url; auto data_hash = item.get(); auto ssf = std::make_shared(url, true); m_data_sources.insert(std::make_pair(data_hash, ssf)); @@ -206,6 +210,7 @@ uint32_t parser::feed_datasource() { data_hash, shu_info.get(), item.get()); all_data_info.push_back(data_info.make_copy()); } + ypc::bytes data_info_bytes; if (m_ptype.d.data_source_type == ypc::utc::single_sealed_datasource_parser) { if (all_data_info.empty()) { @@ -226,6 +231,7 @@ uint32_t parser::feed_datasource() { if (m_ptype.d.data_source_type == ypc::utc::raw_datasource_parser) { data_info_bytes = all_data_info[0].get(); } + auto ret = m_parser->init_data_source(data_info_bytes); if (ret != 0u) { LOG(ERROR) << "init_data_source got error " << ypc::status_string(ret); diff --git a/toolkit/terminus/cmd/cterminus/CMakeLists.txt b/toolkit/terminus/cmd/cterminus/CMakeLists.txt index 21f23d64..44d8d53d 100644 --- a/toolkit/terminus/cmd/cterminus/CMakeLists.txt +++ b/toolkit/terminus/cmd/cterminus/CMakeLists.txt @@ -14,6 +14,24 @@ target_include_directories(yterminus PUBLIC "$" "$" ) + +add_library(yterminus_lib lib.cpp + allowance.cpp + cmd_line.cpp + crypto.cpp + forward.cpp + gen_key.cpp + helper.cpp + relay.cpp + request.cpp + sign.cpp) +set_property(TARGET yterminus_lib PROPERTY OUTPUT_NAME yterminus) +#set_target_properties(yterminus_lib PROPERTIES +# LIBRARY_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib) +target_link_libraries(yterminus_lib terminus glog boost_program_options boost_system core) +install(TARGETS yterminus_lib + DESTINATION "${lib_install_dir}") + install(TARGETS yterminus DESTINATION "${bin_install_dir}") AddClangTidy(yterminus) diff --git a/toolkit/terminus/cmd/cterminus/lib.cpp b/toolkit/terminus/cmd/cterminus/lib.cpp new file mode 100644 index 00000000..03601628 --- /dev/null +++ b/toolkit/terminus/cmd/cterminus/lib.cpp @@ -0,0 +1,16 @@ +#include "lib.hpp" + +int yterminus_cmdline(int argc, char *argv[]) +{ + // TODO + return 0; +} + +int yterminus( + const std::string& etype, + const std::string& enclave_path, + const std::string& output_path) +{ + // TODO + return 0; +} \ No newline at end of file diff --git a/toolkit/terminus/cmd/cterminus/lib.hpp b/toolkit/terminus/cmd/cterminus/lib.hpp new file mode 100644 index 00000000..3f3664f5 --- /dev/null +++ b/toolkit/terminus/cmd/cterminus/lib.hpp @@ -0,0 +1,13 @@ +#ifndef YEEZ_PRIVACY_COMPUTING_YTERMINUS_LIB_H +#define YEEZ_PRIVACY_COMPUTING_YTERMINUS_LIB_H + +#include "cmd_line.h" + +int yterminus_cmdline(int argc, char *argv[]); + +int yterminus( + const std::string& etype = "parser", + const std::string& enclave_path = "", + const std::string& output_path = ""); + +#endif //YEEZ_PRIVACY_COMPUTING_YTERMINUS_LIB_H