Skip to content

Some Multiprocessing Added #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions fuzzware_pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import subprocess
import sys
import time
import multiprocessing as mp
from datetime import datetime

from . import naming_conventions as nc
Expand Down Expand Up @@ -734,6 +735,20 @@ def do_gentraces(args, leftover_args):
STATNAME_COV, STATNAME_MMIO_COSTS, STATNAME_MMIO_OVERHEAD_ELIM,
STATNAME_CRASH_CONTEXTS, STATNAME_CRASH_TIMINGS
]


def multi_run_target_manager(args_list: list):
with mp.Pool() as pool:
pool.starmap(run_target_wrapper, args_list)


def run_target_wrapper(config_path, input_path, extra_args, get_output=False, silent=False, result_queue=None):
from .run_target import run_target
# emu_output = str(run_target(config_path, crashing_input, extra_args, get_output=True, silent=True))
result_queue.put(
(input_path, str(run_target(config_path, input_path, extra_args, get_output=get_output, silent=silent))))


def do_genstats(args, leftover_args):
from .util.config import load_config_deep
from .workers.tracegen import gen_all_missing_traces
Expand Down Expand Up @@ -890,15 +905,37 @@ def do_genstats(args, leftover_args):
for main_dir in main_dirs_for_proj(projdir):
logger.info(f"Crash contexts from main dir: {main_dir}")
config_path = None

results_dict = {}
args_tuple_list = []
m = mp.Manager()
results_queue = m.Queue()
iterator = 0
num_processed = 0
for crashing_input in nc.crash_paths_for_main_dir(main_dir):
# continue
if config_path is None:
config_path = config_for_input_path(crashing_input)
extra_args_file = extra_args_for_config_path(config_path)
extra_args = parse_extra_args(load_extra_args(extra_args_file), projdir)
if "-v" not in extra_args:
extra_args += ["-v"]
emu_output = str(run_target(config_path, crashing_input, extra_args, get_output=True, silent=True))
# emu_output = str(run_target(config_path, crashing_input, extra_args, get_output=True, silent=True))
args_tuple_list.append([config_path, crashing_input, extra_args, True, True, results_queue])
iterator += 1

p = mp.Process(target=multi_run_target_manager, args=(args_tuple_list,))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think mp.Process does not limit the number of concurrent processes. We want to be able to limit this number so that other fuzzing runs are not impacted by the trace generation taking up all computation.

multiprocessing.Pool should be able to limit this number.

Given that, we can add a -n argument similar to the fuzzware pipeline -n <num_cores> argument which allows setting the max number of cores/parallel processes used by trace generation.

p.start()
while num_processed < iterator:
result = results_queue.get(block=True)
results_dict[result[0]] = result[1]
num_processed += 1
p.join()
# print(results_dict)
# exit()

for crashing_input in nc.crash_paths_for_main_dir(main_dir):

emu_output = results_dict[crashing_input]
pc, lr = pc_lr_from_emu_output(emu_output)
crashing_input = str(Path(crashing_input).relative_to(projdir))

Expand All @@ -909,6 +946,7 @@ def do_genstats(args, leftover_args):
crash_contexts.setdefault((pc, lr), []).append(crashing_input)
logger.info(f"Got (pc, lr) = ({pc:#010x}, {lr:#010x}) for the following input path: {crashing_input}")


crash_context_out_path = os.path.join(projdir, nc.PIPELINE_DIRNAME_STATS, nc.STATS_FILENAME_CRASH_CONTEXTS)
dump_crash_contexts(crash_context_out_path, crash_contexts)

Expand Down
36 changes: 24 additions & 12 deletions fuzzware_pipeline/workers/tracegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time
import uuid
from pathlib import Path

import multiprocessing as mp
import rq
from fuzzware_pipeline.logging_handler import logging_handler
from rq.worker import WorkerStatus
Expand All @@ -16,14 +16,14 @@
from ..util.config import load_extra_args, parse_extra_args

logger = logging_handler().get_logger("tracegen")

MULTI = True
FORKSRV_FD = 198

# Make sure these names are synchronized with the argument names below
ARGNAME_BBL_SET_PATH, ARGNAME_MMIO_SET_PATH = "bbl_set_path", "mmio_set_path"
ARGNAME_EXTRA_ARGS = "extra_args"
FORKSERVER_UNSUPPORTED_TRACE_ARGS = ("mmio_trace_path", "bbl_trace_path", "ram_trace_path")
def gen_traces(config_path, input_path, bbl_trace_path=None, ram_trace_path=None, mmio_trace_path=None, bbl_set_path=None, mmio_set_path=None, extra_args=None, silent=False, bbl_hash_path=None):
def gen_traces(config_path, input_path, bbl_trace_path=None, ram_trace_path=None, mmio_trace_path=None, bbl_set_path=None, mmio_set_path=None, extra_args=None, silent=False, bbl_hash_path=None,queue=None):
extra_args = list(extra_args) if extra_args else []

if bbl_trace_path is not None:
Expand All @@ -40,6 +40,7 @@ def gen_traces(config_path, input_path, bbl_trace_path=None, ram_trace_path=None
extra_args += ["--bb-hash-out", bbl_hash_path]

run_target(config_path, input_path, extra_args, silent=silent, stdout=subprocess.DEVNULL if silent else None, stderr=subprocess.DEVNULL if silent else None)
queue.put(1)
return True

def batch_gen_native_traces(config_path, input_paths, extra_args=None, bbl_set_paths=None, mmio_set_paths=None, bbl_hash_paths=None, silent=False):
Expand Down Expand Up @@ -67,6 +68,11 @@ def batch_gen_native_traces(config_path, input_paths, extra_args=None, bbl_set_p

gentrace_proc.destroy()

def multi_proc_manager(function=None,arg_tuple_list=[]):
with mp.Pool() as p:
p.starmap(function, arg_tuple_list)


def gen_missing_maindir_traces(maindir, required_trace_prefixes, fuzzer_nums=None, tracedir_postfix="", log_progress=False, verbose=False, crashing_inputs=False):
projdir = nc.project_base(maindir)
config_path = nc.config_file_for_main_path(maindir)
Expand Down Expand Up @@ -143,21 +149,27 @@ def gen_missing_maindir_traces(maindir, required_trace_prefixes, fuzzer_nums=Non
if log_progress:
logger.info(f"Generating traces took {time.time() - start_time:.02f} seconds for {len(input_paths)} input(s)")
else:
m = mp.Manager()
processed_queue=m.Queue()
num_processed = 0
job_args_multi = []
iteration = 0
for input_path, bbl_trace_path, ram_trace_path, mmio_trace_path, bbl_set_path, mmio_set_path, bbl_hash_path in jobs_for_config:
gen_traces(str(config_path), str(input_path),
bbl_trace_path=bbl_trace_path, ram_trace_path=ram_trace_path, mmio_trace_path=mmio_trace_path,
bbl_set_path=bbl_set_path, mmio_set_path=mmio_set_path, bbl_hash_path=bbl_hash_path,
extra_args=extra_args, silent=not verbose
)
num_processed += 1

job_args_multi.append((str(config_path), str(input_path), bbl_trace_path, ram_trace_path, mmio_trace_path,
bbl_set_path, mmio_set_path, extra_args, not verbose, bbl_hash_path, processed_queue))
iteration += 1
p = mp.Process(target=multi_proc_manager, args=(gen_traces,job_args_multi))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. We want to be able to limit the number of cores/parallel processes used

p.start()
while num_processed < iteration:
num_processed+=processed_queue.get(block=True,timeout=100)
if log_progress:
if num_processed > 0 and num_processed % 50 == 0:
time_passed = round(time.time() - start_time)
relative_done = (num_processed+1) / num_gentrace_jobs
relative_done = (num_processed + 1) / num_gentrace_jobs
time_estimated = round((relative_done ** (-1)) * time_passed)
logger.info(f"[*] Processed {num_processed}/{num_gentrace_jobs} in {time_passed} seconds. Estimated seconds remaining: {time_estimated-time_passed}")
logger.info(
f"[*] Processed {num_processed}/{num_gentrace_jobs} in {time_passed} seconds. Estimated seconds remaining: {time_estimated - time_passed}")
p.join()

def gen_all_missing_traces(projdir, trace_name_prefixes=None, log_progress=False, verbose=False, crashing_inputs=False):
if trace_name_prefixes is None:
Expand Down