diff --git a/fuzzware_pipeline/__init__.py b/fuzzware_pipeline/__init__.py index a9f6fd5..842ac01 100644 --- a/fuzzware_pipeline/__init__.py +++ b/fuzzware_pipeline/__init__.py @@ -6,6 +6,7 @@ import subprocess import sys import time +import multiprocessing as mp from datetime import datetime from . import naming_conventions as nc @@ -734,6 +735,20 @@ def do_gentraces(args, leftover_args): STATNAME_COV, STATNAME_MMIO_COSTS, STATNAME_MMIO_OVERHEAD_ELIM, STATNAME_CRASH_CONTEXTS, STATNAME_CRASH_TIMINGS ] + + +def multi_run_target_manager(args_list: list): + with mp.Pool() as pool: + pool.starmap(run_target_wrapper, args_list) + + +def run_target_wrapper(config_path, input_path, extra_args, get_output=False, silent=False, result_queue=None): + from .run_target import run_target + # emu_output = str(run_target(config_path, crashing_input, extra_args, get_output=True, silent=True)) + result_queue.put( + (input_path, str(run_target(config_path, input_path, extra_args, get_output=get_output, silent=silent)))) + + def do_genstats(args, leftover_args): from .util.config import load_config_deep from .workers.tracegen import gen_all_missing_traces @@ -890,15 +905,37 @@ def do_genstats(args, leftover_args): for main_dir in main_dirs_for_proj(projdir): logger.info(f"Crash contexts from main dir: {main_dir}") config_path = None - + results_dict = {} + args_tuple_list = [] + m = mp.Manager() + results_queue = m.Queue() + iterator = 0 + num_processed = 0 for crashing_input in nc.crash_paths_for_main_dir(main_dir): + # continue if config_path is None: config_path = config_for_input_path(crashing_input) extra_args_file = extra_args_for_config_path(config_path) extra_args = parse_extra_args(load_extra_args(extra_args_file), projdir) if "-v" not in extra_args: extra_args += ["-v"] - emu_output = str(run_target(config_path, crashing_input, extra_args, get_output=True, silent=True)) + # emu_output = str(run_target(config_path, crashing_input, extra_args, get_output=True, silent=True)) + args_tuple_list.append([config_path, crashing_input, extra_args, True, True, results_queue]) + iterator += 1 + + p = mp.Process(target=multi_run_target_manager, args=(args_tuple_list,)) + p.start() + while num_processed < iterator: + result = results_queue.get(block=True) + results_dict[result[0]] = result[1] + num_processed += 1 + p.join() + # print(results_dict) + # exit() + + for crashing_input in nc.crash_paths_for_main_dir(main_dir): + + emu_output = results_dict[crashing_input] pc, lr = pc_lr_from_emu_output(emu_output) crashing_input = str(Path(crashing_input).relative_to(projdir)) @@ -909,6 +946,7 @@ def do_genstats(args, leftover_args): crash_contexts.setdefault((pc, lr), []).append(crashing_input) logger.info(f"Got (pc, lr) = ({pc:#010x}, {lr:#010x}) for the following input path: {crashing_input}") + crash_context_out_path = os.path.join(projdir, nc.PIPELINE_DIRNAME_STATS, nc.STATS_FILENAME_CRASH_CONTEXTS) dump_crash_contexts(crash_context_out_path, crash_contexts) diff --git a/fuzzware_pipeline/workers/tracegen.py b/fuzzware_pipeline/workers/tracegen.py index 04c565e..c5ea4bb 100644 --- a/fuzzware_pipeline/workers/tracegen.py +++ b/fuzzware_pipeline/workers/tracegen.py @@ -6,7 +6,7 @@ import time import uuid from pathlib import Path - +import multiprocessing as mp import rq from fuzzware_pipeline.logging_handler import logging_handler from rq.worker import WorkerStatus @@ -16,14 +16,14 @@ from ..util.config import load_extra_args, parse_extra_args logger = logging_handler().get_logger("tracegen") - +MULTI = True FORKSRV_FD = 198 # Make sure these names are synchronized with the argument names below ARGNAME_BBL_SET_PATH, ARGNAME_MMIO_SET_PATH = "bbl_set_path", "mmio_set_path" ARGNAME_EXTRA_ARGS = "extra_args" FORKSERVER_UNSUPPORTED_TRACE_ARGS = ("mmio_trace_path", "bbl_trace_path", "ram_trace_path") -def gen_traces(config_path, input_path, bbl_trace_path=None, ram_trace_path=None, mmio_trace_path=None, bbl_set_path=None, mmio_set_path=None, extra_args=None, silent=False, bbl_hash_path=None): +def gen_traces(config_path, input_path, bbl_trace_path=None, ram_trace_path=None, mmio_trace_path=None, bbl_set_path=None, mmio_set_path=None, extra_args=None, silent=False, bbl_hash_path=None,queue=None): extra_args = list(extra_args) if extra_args else [] if bbl_trace_path is not None: @@ -40,6 +40,7 @@ def gen_traces(config_path, input_path, bbl_trace_path=None, ram_trace_path=None extra_args += ["--bb-hash-out", bbl_hash_path] run_target(config_path, input_path, extra_args, silent=silent, stdout=subprocess.DEVNULL if silent else None, stderr=subprocess.DEVNULL if silent else None) + queue.put(1) return True def batch_gen_native_traces(config_path, input_paths, extra_args=None, bbl_set_paths=None, mmio_set_paths=None, bbl_hash_paths=None, silent=False): @@ -67,6 +68,11 @@ def batch_gen_native_traces(config_path, input_paths, extra_args=None, bbl_set_p gentrace_proc.destroy() +def multi_proc_manager(function=None,arg_tuple_list=[]): + with mp.Pool() as p: + p.starmap(function, arg_tuple_list) + + def gen_missing_maindir_traces(maindir, required_trace_prefixes, fuzzer_nums=None, tracedir_postfix="", log_progress=False, verbose=False, crashing_inputs=False): projdir = nc.project_base(maindir) config_path = nc.config_file_for_main_path(maindir) @@ -143,21 +149,27 @@ def gen_missing_maindir_traces(maindir, required_trace_prefixes, fuzzer_nums=Non if log_progress: logger.info(f"Generating traces took {time.time() - start_time:.02f} seconds for {len(input_paths)} input(s)") else: + m = mp.Manager() + processed_queue=m.Queue() num_processed = 0 + job_args_multi = [] + iteration = 0 for input_path, bbl_trace_path, ram_trace_path, mmio_trace_path, bbl_set_path, mmio_set_path, bbl_hash_path in jobs_for_config: - gen_traces(str(config_path), str(input_path), - bbl_trace_path=bbl_trace_path, ram_trace_path=ram_trace_path, mmio_trace_path=mmio_trace_path, - bbl_set_path=bbl_set_path, mmio_set_path=mmio_set_path, bbl_hash_path=bbl_hash_path, - extra_args=extra_args, silent=not verbose - ) - num_processed += 1 - + job_args_multi.append((str(config_path), str(input_path), bbl_trace_path, ram_trace_path, mmio_trace_path, + bbl_set_path, mmio_set_path, extra_args, not verbose, bbl_hash_path, processed_queue)) + iteration += 1 + p = mp.Process(target=multi_proc_manager, args=(gen_traces,job_args_multi)) + p.start() + while num_processed < iteration: + num_processed+=processed_queue.get(block=True,timeout=100) if log_progress: if num_processed > 0 and num_processed % 50 == 0: time_passed = round(time.time() - start_time) - relative_done = (num_processed+1) / num_gentrace_jobs + relative_done = (num_processed + 1) / num_gentrace_jobs time_estimated = round((relative_done ** (-1)) * time_passed) - logger.info(f"[*] Processed {num_processed}/{num_gentrace_jobs} in {time_passed} seconds. Estimated seconds remaining: {time_estimated-time_passed}") + logger.info( + f"[*] Processed {num_processed}/{num_gentrace_jobs} in {time_passed} seconds. Estimated seconds remaining: {time_estimated - time_passed}") + p.join() def gen_all_missing_traces(projdir, trace_name_prefixes=None, log_progress=False, verbose=False, crashing_inputs=False): if trace_name_prefixes is None: