Skip to content

Parallel benchmarks #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
types: [opened, synchronize, reopened]
jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ matrix.threads }} threads - ${{ github.event_name }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
Expand All @@ -22,10 +22,18 @@ jobs:
- windows-latest
arch:
- x64
threads:
- 1
- 2
include:
- version: '1'
os: macos-latest
arch: arm64
threads: 1
- version: '1'
os: macos-latest
arch: arm64
threads: 2
steps:
- name: Set git to use LF
run: |
Expand All @@ -48,7 +56,10 @@ jobs:
${{ runner.os }}-test-
${{ runner.os }}-
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
- name: Run tests with ${{ matrix.threads }} threads
uses: julia-actions/julia-runtest@v1
env:
JULIA_NUM_THREADS: ${{ matrix.threads }}
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v1
with:
Expand Down
106 changes: 78 additions & 28 deletions src/run_solver.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export solve_problems
using Base.Threads

"""
solve_problems(solver, solver_name, problems; kwargs...)
Expand All @@ -13,6 +14,7 @@ Apply a solver to a set of problems.
CUTEst problems).

#### Keyword arguments
* `use_threads::Bool`: whether to use threads (default: `true`);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this keyword? Threads should be enabled/disabled with the environment variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the use_threads keyword is valuable to keep, even with the environment variable, for the following reasons:

Testing and Debugging: This flag allows us to explicitly override the environment variable during testing or debugging. It’s particularly useful to verify that the application behaves correctly in both threaded and non-threaded modes without re-writing the functionality in the unit tests.

Fine-Grained User Control: Some users may want to override the global threading setting for specific scenarios:

They might want to disable threading for this application while leaving it enabled for other programs to avoid resource contention.

I think keeping this flag ensures flexibility and adaptability, making the tool more useful for both development and usage scenarios.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A user who doesn’t set JULIA_NUM_THREADS is not expected anything to run multi-threaded, yet use_threads will be true. I think that only introduces confusion.

* `solver_logger::AbstractLogger`: logger wrapping the solver call (default: `NullLogger`);
* `reset_problem::Bool`: reset the problem's counters before solving (default: `true`);
* `skipif::Function`: function to be applied to a problem and return whether to skip it
Expand All @@ -31,6 +33,7 @@ function solve_problems(
solver,
solver_name::TName,
problems;
use_threads::Bool = true,
solver_logger::AbstractLogger = NullLogger(),
reset_problem::Bool = true,
skipif::Function = x -> false,
Expand All @@ -46,13 +49,16 @@ function solve_problems(
:dual_feas,
:primal_feas,
],
info_hdr_override::Dict{Symbol, String} = Dict{Symbol, String}(:solver_name => "Solver"),
info_hdr_override::Dict{Symbol, String} = Dict(:solver_name => "Solver"),
prune::Bool = true,
kwargs...,
) where {TName}

# Collect information about counters
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these comments and blank lines that do not have anything to do with the new functionality.

f_counters = collect(fieldnames(Counters))
fnls_counters = collect(fieldnames(NLSCounters))[2:end] # Excludes :counters
ncounters = length(f_counters) + length(fnls_counters)

types = [
TName
Int
Expand Down Expand Up @@ -88,27 +94,49 @@ function solve_problems(
]
stats = DataFrame(names .=> [T[] for T in types])

# Thread-safe mechanisms
stats_lock = ReentrantLock() # Lock for modifying shared data structures
first_problem = Atomic{Bool}(true) # Atomic for safe interaction
# specific = Atomic{Vector{Symbol}}([]) # Use atomic for thread-safe updates
specific = Symbol[]
nb_unsuccessful_since_start = Atomic{Int}(0)

# Prepare DataFrame columns for logging
col_idx = indexin(colstats, names)

first_problem = true
nb_unsuccessful_since_start = 0
@info log_header(colstats, types[col_idx], hdr_override = info_hdr_override)

for (id, problem) in enumerate(problems)
# Convert problems to an indexable vector
problem_list = collect(problems)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really do not want this. If problems is a generator of all CUTEst models, this line will try to materialize them all at once and hold them all in memory simultaneously, which is not acceptable.

num_problems = length(problem_list)

# Function to safely push data to the DataFrame
function safe_push!(data_entry)
lock(stats_lock) do
push!(stats, data_entry)
end
end

# Function to process a single problem
function process_problem(idx)
problem = problem_list[idx]

# Reset the problem, if requested
if reset_problem
reset!(problem)
end

# Problem metadata
nequ = problem isa AbstractNLSModel ? problem.nls_meta.nequ : 0
problem_info = [id; problem.meta.name; problem.meta.nvar; problem.meta.ncon; nequ]
problem_info = [idx; problem.meta.name; problem.meta.nvar; problem.meta.ncon; nequ]

# Determine if the problem should be skipped
skipthis = skipif(problem)
# Check if this problem should be skipped
if skipthis
if first_problem && !prune
nb_unsuccessful_since_start += 1
if first_problem[] && !prune
atomic_add!(nb_unsuccessful_since_start, 1)
end
prune || push!(
stats,
prune || safe_push!(
[
solver_name
problem_info
Expand All @@ -124,33 +152,41 @@ function solve_problems(
],
)
finalize(problem)
return
else
try
s = with_logger(solver_logger) do
solver(problem; kwargs...)
end
if first_problem
for (k, v) in s.solver_specific
if !(typeof(v) <: AbstractVector)
insertcols!(
stats,
ncol(stats) + 1,
k => Vector{Union{typeof(v), Missing}}(undef, nb_unsuccessful_since_start),
)
push!(specific, k)

# Handle first problem (thread-safe updates)
if first_problem[]
first_problem[] = false
lock(stats_lock) do
for (k, v) in s.solver_specific
if !(typeof(v) <: AbstractVector)
insertcols!(
stats,
ncol(stats) + 1,
k => Vector{Union{typeof(v), Missing}}(undef, nb_unsuccessful_since_start[]),
)
push!(specific, k)
end
end
end
first_problem = false
end

# Collect counters
counters_list =
problem isa AbstractNLSModel ?
[getfield(problem.counters.counters, f) for f in f_counters] :
[getfield(problem.counters, f) for f in f_counters]
nls_counters_list =
problem isa AbstractNLSModel ? [getfield(problem.counters, f) for f in fnls_counters] :
zeros(Int, length(fnls_counters))
push!(
stats,

# Add the s to `stats`
safe_push!(
[
solver_name
problem_info
Expand All @@ -166,13 +202,15 @@ function solve_problems(
[s.solver_specific[k] for k in specific]
],
)

catch e
@error "caught exception" e
if first_problem
nb_unsuccessful_since_start += 1
@error "Caught exception for problem $idx: $e"

if first_problem[]
atomic_add!(nb_unsuccessful_since_start, 1)
end
push!(
stats,

safe_push!(
[
solver_name
problem_info
Expand All @@ -191,7 +229,19 @@ function solve_problems(
finalize(problem)
end
end
(skipthis && prune) || @info log_row(stats[end, col_idx])
# (skipthis && prune) || @info log_row(stats[end, col_idx])
end

# Multithreaded or single-threaded execution
if use_threads
Threads.@threads for idx = 1:num_problems
process_problem(idx)
end
else
for idx = 1:num_problems
process_problem(idx)
end
end

return stats
end
49 changes: 48 additions & 1 deletion test/test_bmark.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ using DataFrames
using Logging
using NLPModels, ADNLPModels
using SolverCore

import SolverCore.dummy_solver

mutable struct CallableSolver end
Expand Down Expand Up @@ -65,6 +64,53 @@ function test_bmark()
pretty_stats(stats[:dummy])
end

@testset "Multithread vs Single-Thread Consistency" begin
problems = (
ADNLPModel(x -> sum(x .^ 2), ones(2), name = "Quadratic"),
ADNLPModel(
x -> sum(x .^ 2),
ones(2),
x -> [sum(x) - 1],
[0.0],
[0.0],
name = "Cons quadratic",
),
ADNLPModel(
x -> (x[1] - 1)^2 + 4 * (x[2] - x[1]^2)^2,
ones(2),
x -> [x[1]^2 + x[2]^2 - 1],
[0.0],
[0.0],
name = "Cons Rosen",
),
)
callable = CallableSolver()
solvers = Dict(
:dummy_1 => dummy_solver,
:callable => callable,
:dummy_solver_specific =>
nlp -> dummy_solver(
nlp,
callback = (nlp, solver, stats) -> set_solver_specific!(stats, :foo, 1),
),
)

# Run the single-threaded version
single_threaded_result = bmark_solvers(solvers, problems, use_threads = false)
multithreaded_result = bmark_solvers(solvers, problems, use_threads = true)

# Compare the results
@test length(single_threaded_result) == length(multithreaded_result)

for (mykey, df) in single_threaded_result
df1 = select(df, Not(:elapsed_time))
df2 = select(multithreaded_result[mykey], Not(:elapsed_time))
sort!(df1, [:id])
sort!(df2, [:id])

@test isequal(df1, df2)
end
end
@testset "Testing logging" begin
nlps = [ADNLPModel(x -> sum(x .^ k), ones(2k), name = "Sum of power $k") for k = 2:4]
push!(
Expand Down Expand Up @@ -122,6 +168,7 @@ function test_bmark()
)
stats = bmark_solvers(solvers, problems)

sort!(stats[:dummy_solver_specific], [:id]) # sort by id, multi threaded may not be in order
@test stats[:dummy_solver_specific][1, :status] == :exception
@test stats[:dummy_solver_specific][2, :status] == :first_order
@test stats[:dummy_solver_specific][3, :status] == :exception
Expand Down
Loading