diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d778ad5b..09cbb9ac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,7 +7,7 @@ on: types: [opened, synchronize, reopened] jobs: test: - name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} + name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ matrix.threads }} threads - ${{ github.event_name }} runs-on: ${{ matrix.os }} strategy: fail-fast: false @@ -22,10 +22,18 @@ jobs: - windows-latest arch: - x64 + threads: + - 1 + - 2 include: - version: '1' os: macos-latest arch: arm64 + threads: 1 + - version: '1' + os: macos-latest + arch: arm64 + threads: 2 steps: - name: Set git to use LF run: | @@ -48,7 +56,10 @@ jobs: ${{ runner.os }}-test- ${{ runner.os }}- - uses: julia-actions/julia-buildpkg@v1 - - uses: julia-actions/julia-runtest@v1 + - name: Run tests with ${{ matrix.threads }} threads + uses: julia-actions/julia-runtest@v1 + env: + JULIA_NUM_THREADS: ${{ matrix.threads }} - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v1 with: diff --git a/src/run_solver.jl b/src/run_solver.jl index 608b8afe..f48353aa 100644 --- a/src/run_solver.jl +++ b/src/run_solver.jl @@ -1,4 +1,5 @@ export solve_problems +using Base.Threads """ solve_problems(solver, solver_name, problems; kwargs...) @@ -13,6 +14,7 @@ Apply a solver to a set of problems. CUTEst problems). #### Keyword arguments +* `use_threads::Bool`: whether to use threads (default: `true`); * `solver_logger::AbstractLogger`: logger wrapping the solver call (default: `NullLogger`); * `reset_problem::Bool`: reset the problem's counters before solving (default: `true`); * `skipif::Function`: function to be applied to a problem and return whether to skip it @@ -31,6 +33,7 @@ function solve_problems( solver, solver_name::TName, problems; + use_threads::Bool = true, solver_logger::AbstractLogger = NullLogger(), reset_problem::Bool = true, skipif::Function = x -> false, @@ -46,13 +49,16 @@ function solve_problems( :dual_feas, :primal_feas, ], - info_hdr_override::Dict{Symbol, String} = Dict{Symbol, String}(:solver_name => "Solver"), + info_hdr_override::Dict{Symbol, String} = Dict(:solver_name => "Solver"), prune::Bool = true, kwargs..., ) where {TName} + + # Collect information about counters f_counters = collect(fieldnames(Counters)) fnls_counters = collect(fieldnames(NLSCounters))[2:end] # Excludes :counters ncounters = length(f_counters) + length(fnls_counters) + types = [ TName Int @@ -88,27 +94,49 @@ function solve_problems( ] stats = DataFrame(names .=> [T[] for T in types]) + # Thread-safe mechanisms + stats_lock = ReentrantLock() # Lock for modifying shared data structures + first_problem = Atomic{Bool}(true) # Atomic for safe interaction + # specific = Atomic{Vector{Symbol}}([]) # Use atomic for thread-safe updates specific = Symbol[] + nb_unsuccessful_since_start = Atomic{Int}(0) + # Prepare DataFrame columns for logging col_idx = indexin(colstats, names) - - first_problem = true - nb_unsuccessful_since_start = 0 @info log_header(colstats, types[col_idx], hdr_override = info_hdr_override) - for (id, problem) in enumerate(problems) + # Convert problems to an indexable vector + problem_list = collect(problems) + num_problems = length(problem_list) + + # Function to safely push data to the DataFrame + function safe_push!(data_entry) + lock(stats_lock) do + push!(stats, data_entry) + end + end + + # Function to process a single problem + function process_problem(idx) + problem = problem_list[idx] + + # Reset the problem, if requested if reset_problem reset!(problem) end + + # Problem metadata nequ = problem isa AbstractNLSModel ? problem.nls_meta.nequ : 0 - problem_info = [id; problem.meta.name; problem.meta.nvar; problem.meta.ncon; nequ] + problem_info = [idx; problem.meta.name; problem.meta.nvar; problem.meta.ncon; nequ] + + # Determine if the problem should be skipped skipthis = skipif(problem) + # Check if this problem should be skipped if skipthis - if first_problem && !prune - nb_unsuccessful_since_start += 1 + if first_problem[] && !prune + atomic_add!(nb_unsuccessful_since_start, 1) end - prune || push!( - stats, + prune || safe_push!( [ solver_name problem_info @@ -124,24 +152,31 @@ function solve_problems( ], ) finalize(problem) + return else try s = with_logger(solver_logger) do solver(problem; kwargs...) end - if first_problem - for (k, v) in s.solver_specific - if !(typeof(v) <: AbstractVector) - insertcols!( - stats, - ncol(stats) + 1, - k => Vector{Union{typeof(v), Missing}}(undef, nb_unsuccessful_since_start), - ) - push!(specific, k) + + # Handle first problem (thread-safe updates) + if first_problem[] + first_problem[] = false + lock(stats_lock) do + for (k, v) in s.solver_specific + if !(typeof(v) <: AbstractVector) + insertcols!( + stats, + ncol(stats) + 1, + k => Vector{Union{typeof(v), Missing}}(undef, nb_unsuccessful_since_start[]), + ) + push!(specific, k) + end end end - first_problem = false end + + # Collect counters counters_list = problem isa AbstractNLSModel ? [getfield(problem.counters.counters, f) for f in f_counters] : @@ -149,8 +184,9 @@ function solve_problems( nls_counters_list = problem isa AbstractNLSModel ? [getfield(problem.counters, f) for f in fnls_counters] : zeros(Int, length(fnls_counters)) - push!( - stats, + + # Add the s to `stats` + safe_push!( [ solver_name problem_info @@ -166,13 +202,15 @@ function solve_problems( [s.solver_specific[k] for k in specific] ], ) + catch e - @error "caught exception" e - if first_problem - nb_unsuccessful_since_start += 1 + @error "Caught exception for problem $idx: $e" + + if first_problem[] + atomic_add!(nb_unsuccessful_since_start, 1) end - push!( - stats, + + safe_push!( [ solver_name problem_info @@ -191,7 +229,19 @@ function solve_problems( finalize(problem) end end - (skipthis && prune) || @info log_row(stats[end, col_idx]) + # (skipthis && prune) || @info log_row(stats[end, col_idx]) end + + # Multithreaded or single-threaded execution + if use_threads + Threads.@threads for idx = 1:num_problems + process_problem(idx) + end + else + for idx = 1:num_problems + process_problem(idx) + end + end + return stats end diff --git a/test/test_bmark.jl b/test/test_bmark.jl index 7eb6aae5..d76c064a 100644 --- a/test/test_bmark.jl +++ b/test/test_bmark.jl @@ -2,7 +2,6 @@ using DataFrames using Logging using NLPModels, ADNLPModels using SolverCore - import SolverCore.dummy_solver mutable struct CallableSolver end @@ -65,6 +64,53 @@ function test_bmark() pretty_stats(stats[:dummy]) end + @testset "Multithread vs Single-Thread Consistency" begin + problems = ( + ADNLPModel(x -> sum(x .^ 2), ones(2), name = "Quadratic"), + ADNLPModel( + x -> sum(x .^ 2), + ones(2), + x -> [sum(x) - 1], + [0.0], + [0.0], + name = "Cons quadratic", + ), + ADNLPModel( + x -> (x[1] - 1)^2 + 4 * (x[2] - x[1]^2)^2, + ones(2), + x -> [x[1]^2 + x[2]^2 - 1], + [0.0], + [0.0], + name = "Cons Rosen", + ), + ) + callable = CallableSolver() + solvers = Dict( + :dummy_1 => dummy_solver, + :callable => callable, + :dummy_solver_specific => + nlp -> dummy_solver( + nlp, + callback = (nlp, solver, stats) -> set_solver_specific!(stats, :foo, 1), + ), + ) + + # Run the single-threaded version + single_threaded_result = bmark_solvers(solvers, problems, use_threads = false) + multithreaded_result = bmark_solvers(solvers, problems, use_threads = true) + + # Compare the results + @test length(single_threaded_result) == length(multithreaded_result) + + for (mykey, df) in single_threaded_result + df1 = select(df, Not(:elapsed_time)) + df2 = select(multithreaded_result[mykey], Not(:elapsed_time)) + sort!(df1, [:id]) + sort!(df2, [:id]) + + @test isequal(df1, df2) + end + end @testset "Testing logging" begin nlps = [ADNLPModel(x -> sum(x .^ k), ones(2k), name = "Sum of power $k") for k = 2:4] push!( @@ -122,6 +168,7 @@ function test_bmark() ) stats = bmark_solvers(solvers, problems) + sort!(stats[:dummy_solver_specific], [:id]) # sort by id, multi threaded may not be in order @test stats[:dummy_solver_specific][1, :status] == :exception @test stats[:dummy_solver_specific][2, :status] == :first_order @test stats[:dummy_solver_specific][3, :status] == :exception