Skip to content

Commit 9d9c3bb

Browse files
committed
fix leak - make registry entry a WeakRef on the node releasing the darray
1 parent 6a376d1 commit 9d9c3bb

File tree

3 files changed

+18
-9
lines changed

3 files changed

+18
-9
lines changed

src/core.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ function d_closeall()
4444
crefs = copy(refs)
4545
for id in crefs
4646
if id[1] == myid() # sanity check
47-
haskey(registry, id) && close(registry[id])
47+
haskey(registry, id) && close(d_from_weakref_or_d(id))
4848
yield()
4949
end
5050
end

src/darray.jl

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ type DArray{T,N,A} <: AbstractArray{T,N}
3939
end
4040
release = (myid() == id[1])
4141

42-
haskey(registry, id) && return registry[id]
42+
d = d_from_weakref_or_d(id)
43+
if d === nothing
44+
d = new(id, dims, pids, indexes, cuts, lp, release)
45+
end
4346

44-
d = new(id, dims, pids, indexes, cuts, lp, release)
4547
if release
4648
push!(refs, id)
47-
registry[id] = d
49+
registry[id] = WeakRef(d)
4850

4951
# println("Installing finalizer for : ", d.id, ", : ", object_id(d), ", isbits: ", isbits(d))
5052
finalizer(d, close)
@@ -55,6 +57,12 @@ type DArray{T,N,A} <: AbstractArray{T,N}
5557
DArray{T,N,A}() where {T,N,A} = new()
5658
end
5759

60+
function d_from_weakref_or_d(id)
61+
d = get(registry, id, nothing)
62+
isa(d, WeakRef) && return d.value
63+
return d
64+
end
65+
5866
eltype{T}(::Type{DArray{T}}) = T
5967
empty_localpart(T,N,A) = A(Array{T}(ntuple(zero, N)))
6068

@@ -90,6 +98,7 @@ function DArray(id, init, dims, pids, idxs, cuts)
9098
A = take!(r)
9199
if myid() in pids
92100
d = registry[id]
101+
d = isa(d, WeakRef) ? d.value : d
93102
else
94103
T = eltype(A)
95104
N = length(dims)
@@ -137,6 +146,7 @@ function ddata(;T::Type=Any, init::Function=I->nothing, pids=workers(), data::Ve
137146

138147
if myid() in pids
139148
d = registry[id]
149+
d = isa(d, WeakRef) ? d.value : d
140150
else
141151
d = DArray{T,1,T}(id, (npids,), pids, idxs, cuts, Nullable{T}())
142152
end
@@ -320,7 +330,7 @@ function localpart{T,N,A}(d::DArray{T,N,A})
320330
return empty_localpart(T,N,A)::A
321331
end
322332

323-
return get(registry[d.id].localpart)::A
333+
return get(d.localpart)::A
324334
end
325335

326336
localpart(d::DArray, localidx...) = localpart(d)[localidx...]

src/serialize.jl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@ function Base.deserialize{DT<:DArray}(S::AbstractSerializer, t::Type{DT})
2020
id = what[2]
2121

2222
if id_only
23-
if haskey(registry, id)
24-
return registry[id]
25-
else
23+
d = d_from_weakref_or_d(id)
24+
if d === nothing
2625
# access to fields will throw an error, at least the deserialization process will not
2726
# result in worker death
2827
d = DT()
2928
d.id = id
30-
return d
3129
end
30+
return d
3231
else
3332
# We are not a participating worker, deser fields and instantiate locally.
3433
dims = deserialize(S)

0 commit comments

Comments
 (0)