-
-
Notifications
You must be signed in to change notification settings - Fork 74
Open
Labels
Description
When constructing an easy but kind of complete introductory example, I am running into ConcurrencyViolationError("lock must be held")
, while the official nested loop example which I adapted it from explicitly said that Dagger does not need locks.
using Dagger: @spawn
using Distributed
# add two further julia processes which could run on other machines
addprocs(2, exeflags="--threads=2")
# Distributed.@everywhere execute code on all machines
@everywhere using Dagger, DataFrames, OnlineStats
# Dagger uses both Threads and Machines as processes
Dagger.all_processors()
# let's distributes some calculations
aggregators = [Mean, Variance, Extrema]
# @sync waits until all enclosed calls to @spawn are ready
df = DataFrame()
@sync for i in 1:1000
data = @spawn rand(10000)
for agg in aggregators
res = @spawn fit!(agg(), data)
push!(df, (i=i, aggregator=nameof(agg), result=res))
end
end
df.result .= fetch.(df.result)
The full stacktrace
julia> using Dagger: @spawn
julia> using Distributed
# add two further julia processes which could run on other machines
WARNING: using Distributed.@spawn in module Main conflicts with an existing identifier.
julia> addprocs(2, exeflags="--threads=2")
# Distributed.@everywhere execute code on all machines
2-element Vector{Int64}:
2
3
julia> @everywhere using Dagger, DataFrames, OnlineStats
# Dagger uses both Threads and Machines as processes
julia> Dagger.all_processors()
# let's distributes some calculations
Set{Dagger.Processor} with 5 elements:
Dagger.ThreadProc(2, 1)
Dagger.ThreadProc(3, 1)
Dagger.ThreadProc(1, 1)
Dagger.ThreadProc(3, 2)
Dagger.ThreadProc(2, 2)
julia> aggregators = [Mean, Variance, Extrema]
# @sync waits until all enclosed calls to @spawn are ready
3-element Vector{UnionAll}:
Mean
Variance
Extrema
julia> df = DataFrame()
0×0 DataFrame
julia> @sync for i in 1:1000
data = @spawn rand(10000)
for agg in aggregators
res = @spawn fit!(agg(), data)
push!(df, (i=i, aggregator=nameof(agg), result=res))
end
end
julia> df.result .= fetch.(df.result)
ERROR: ThunkFailedException:
Root Exception Type: CapturedException
Root Exception:
ConcurrencyViolationError("lock must be held")
Stacktrace:
[1] concurrency_violation
@ ./condition.jl:8
[2] assert_havelock
@ ./condition.jl:25 [inlined]
[3] assert_havelock
@ ./condition.jl:48 [inlined]
[4] assert_havelock
@ ./condition.jl:72 [inlined]
[5] _wait2
@ ./condition.jl:83
[6] #wait#645
@ ./condition.jl:127
[7] wait
@ ./condition.jl:125 [inlined]
[8] wait_for_conn
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/cluster.jl:195
[9] check_worker_state
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/cluster.jl:170
[10] send_msg_
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/messages.jl:172
[11] send_msg
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/messages.jl:122 [inlined]
[12] #remotecall_fetch#159
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:460
[13] remotecall_fetch
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:454
[14] remotecall_fetch
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:492 [inlined]
[15] #181
@ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:490 [inlined]
[16] forwardkeyerror
@ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:475
[17] poolget
@ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:489
[18] move
@ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:98
[19] move
@ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:96 [inlined]
[20] move
@ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:102
[21] #invokelatest#2
@ ./essentials.jl:892 [inlined]
[22] invokelatest
@ ./essentials.jl:889 [inlined]
[23] #166
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1553
Stacktrace:
[1] wait
@ ./task.jl:352 [inlined]
[2] fetch
@ ./task.jl:372 [inlined]
[3] fetch_report
@ ~/.julia/packages/Dagger/Tx54v/src/sch/util.jl:263
[4] do_task
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1562
[5] #143
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1302
This Thunk: Thunk(id=212, fit!(Extrema: n=0 | value=(min = Inf, max = -Inf, nmin = 0, nmax = 0), Thunk[209](rand, ...)))
Stacktrace:
[1] fetch(t::Dagger.ThunkFuture; proc::OSProc, raw::Bool)
@ Dagger ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:16
[2] fetch
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:11 [inlined]
[3] #fetch#73
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:58 [inlined]
[4] fetch
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:54 [inlined]
[5] _broadcast_getindex_evalf
@ ./broadcast.jl:709 [inlined]
[6] _broadcast_getindex
@ ./broadcast.jl:682 [inlined]
[7] getindex
@ ./broadcast.jl:636 [inlined]
[8] copyto_nonleaf!(dest::Vector{…}, bc::Base.Broadcast.Broadcasted{…}, iter::Base.OneTo{…}, state::Int64, count::Int64)
@ Base.Broadcast ./broadcast.jl:1098
[9] restart_copyto_nonleaf!(newdest::Vector{…}, dest::Vector{…}, bc::Base.Broadcast.Broadcasted{…}, val::Variance{…}, I::Int64,iter::Base.OneTo{…}, state::Int64, count::Int64)
@ Base.Broadcast ./broadcast.jl:1089
[10] copyto_nonleaf!(dest::Vector{…}, bc::Base.Broadcast.Broadcasted{…}, iter::Base.OneTo{…}, state::Int64, count::Int64)
@ Base.Broadcast ./broadcast.jl:1105
[11] copy
@ ./broadcast.jl:950 [inlined]
[12] materialize
@ ./broadcast.jl:903 [inlined]
[13] copyto!(lazydf::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…})
@ DataFrames ~/.julia/packages/DataFrames/58MUJ/src/other/broadcasting.jl:207
[14] materialize!
@ ./broadcast.jl:914 [inlined]
[15] materialize!(dest::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…})
@ Base.Broadcast ./broadcast.jl:911
[16] top-level scope
@ REPL[9]:1
Some type information was truncated. Use `show(err)` to see complete types.
Tried on Julia 1.10.1 and 1.10.2 with Dagger 0.18.8