Skip to content

Commit eee71ec

Browse files
authored
Merge pull request #408 from JuliaParallel/jps/darray-single-part
DArray: Operations always return DArrays, and add local partition support for MPI
2 parents f63514a + 0fe91dc commit eee71ec

File tree

11 files changed

+329
-211
lines changed

11 files changed

+329
-211
lines changed

docs/src/darray.md

Lines changed: 84 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,15 @@ This should not be confused with the [DistributedArrays.jl](https://github.com/J
2121

2222
A `DArray` can be created in two ways: through an API similar to the usual
2323
`rand`, `ones`, etc. calls, or by distributing an existing array with
24-
`distribute`. Additionally, most operations on `DArray`s also return `DArray`s
25-
or an equivalent object which represents the operation being performed. It's
26-
generally not recommended to manually construct a `DArray` object unless you're
27-
developing the `DArray` itself.
24+
`distribute`. It's generally not recommended to manually construct a `DArray`
25+
object unless you're developing the `DArray` itself.
2826

2927
### Allocating new arrays
3028

3129
As an example, one can allocate a random `DArray` by calling `rand` with a
3230
`Blocks` object as the first argument - `Blocks` specifies the size of
33-
partitions to be constructed. Note that the `DArray` is a lazy asynchronous
34-
object (i.e. operations on it may execute in the background), so to force it to
35-
be materialized, `fetch` may need to be called:
31+
partitions to be constructed, and must be the same number of dimensions as the
32+
array being allocated.
3633

3734
```julia
3835
# Add some Julia workers
@@ -48,9 +45,6 @@ julia> using Distributed; addprocs(6)
4845
julia> @everywhere using Dagger
4946

5047
julia> DX = rand(Blocks(50, 50), 100, 100)
51-
Dagger.AllocateArray{Float64, 2}(100, 100)
52-
53-
julia> fetch(DX)
5448
Dagger.DArray{Any, 2, typeof(cat)}(100, 100)
5549
```
5650

@@ -59,6 +53,18 @@ should be allocated which is in total 100 x 100, split into 4 blocks of size 50
5953
x 50, and initialized with random `Float64`s. Many other functions, like
6054
`randn`, `ones`, and `zeros` can be called in this same way.
6155

56+
Note that the `DArray` is an asynchronous object (i.e. operations on it may
57+
execute in the background), so to force it to be materialized, `fetch` may need
58+
to be called:
59+
60+
```julia
61+
julia> fetch(DX)
62+
Dagger.DArray{Any, 2, typeof(cat)}(100, 100)
63+
```
64+
65+
This doesn't change the type or values of the `DArray`, but it does make sure
66+
that any pending operations have completed.
67+
6268
To convert a `DArray` back into an `Array`, `collect` can be used to gather the
6369
data from all the Julia workers that they're on and combine them into a single
6470
`Array` on the worker calling `collect`:
@@ -97,26 +103,20 @@ julia> collect(DX)
97103
### Distributing existing arrays
98104

99105
Now let's look at constructing a `DArray` from an existing array object; we can
100-
do this by calling `Distribute`:
106+
do this by calling `distribute`:
101107

102108
```julia
103109
julia> Z = zeros(100, 500);
104110

105-
julia> Dzeros = Distribute(Blocks(10, 50), Z)
106-
Distribute{Float64, 2}(100, 500)
107-
108-
julia> fetch(Dzeros)
109-
Dagger.DArray{Any, 2, typeof(cat)}(100, 500)
110-
```
111-
112-
If we wanted to skip having to call `fetch`, we could just call `distribute`,
113-
which blocks until distributing the array is completed:
114-
115-
```julia
116111
julia> Dzeros = distribute(Z, Blocks(10, 50))
117112
Dagger.DArray{Any, 2, typeof(cat)}(100, 500)
118113
```
119114

115+
This will distribute the array partitions (in chunks of 10 x 50 matrices)
116+
across the workers in the Julia cluster in a relatively even distribution;
117+
future operations on a `DArray` may produce a different distribution from the
118+
one chosen by `distribute`.
119+
120120
## Broadcasting
121121

122122
As the `DArray` is a subtype of `AbstractArray` and generally satisfies Julia's
@@ -125,80 +125,88 @@ expected:
125125

126126
```julia
127127
julia> DX = rand(Blocks(50,50), 100, 100)
128-
Dagger.AllocateArray{Float64, 2}(100, 100)
128+
Dagger.DArray{Float64, 2, Blocks{2}, typeof(cat)}(100, 100)
129129

130130
julia> DY = DX .+ DX
131-
Dagger.BCast{Base.Broadcast.Broadcasted{Dagger.DaggerBroadcastStyle, Tuple{Base.OneTo{Int64}, Base.OneTo{Int64}}, typeof(+), Tuple{Dagger.AllocateArray{Float64, 2}, Dagger.AllocateArray{Float64, 2}}}, Float64, 2}(100, 100)
131+
Dagger.DArray{Float64, 2, Blocks{2}, typeof(cat)}(100, 100)
132132

133133
julia> DZ = DY .* 3
134-
Dagger.BCast{Base.Broadcast.Broadcasted{Dagger.DaggerBroadcastStyle, Tuple{Base.OneTo{Int64}, Base.OneTo{Int64}}, typeof(*), Tuple{Dagger.BCast{Base.Broadcast.Broadcasted{Dagger.DaggerBroadcastStyle, Tuple{Base.OneTo{Int64}, Base.OneTo{Int64}}, typeof(+), Tuple{Dagger.AllocateArray{Float64, 2}, Dagger.AllocateArray{Float64, 2}}}, Float64, 2}, Int64}}, Float64, 2}(100, 100)
135-
136-
julia> size(DZ)
137-
(100, 100)
138-
139-
julia> DA = fetch(DZ)
140-
Dagger.DArray{Any, 2, typeof(cat)}(100, 100)
134+
Dagger.DArray{Float64, 2, Blocks{2}, typeof(cat)}(100, 100)
141135
```
142136

143-
Now, `DA` is the lazy result of computing `(DX .+ DX) .* 3`. Note that `DArray`
144-
objects are immutable, and operations on them are thus functional
137+
Now, `DZ` will contain the result of computing `(DX .+ DX) .* 3`. Note that
138+
`DArray` objects are immutable, and operations on them are thus functional
145139
transformations of their input `DArray`.
146140

147141
!!! note
148142
Support for mutation of `DArray`s is planned for a future release
149143

150-
Additionally, note that we can still call `size` on these lazy `BCast` objects,
151-
as it's clear what the final output's size will be.
152-
153144
```
154-
julia> Dagger.chunks(DA)
145+
julia> Dagger.chunks(DZ)
146+
2×2 Matrix{Any}:
147+
EagerThunk (finished) EagerThunk (finished)
148+
EagerThunk (finished) EagerThunk (finished)
149+
150+
julia> Dagger.chunks(fetch(DZ))
155151
2×2 Matrix{Union{Thunk, Dagger.Chunk}}:
156152
Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(4, 8, 0x0000000000004e20), ThreadProc(4, 1), AnyScope(), true) … Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(2, 5, 0x0000000000004e20), ThreadProc(2, 1), AnyScope(), true)
157153
Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(5, 5, 0x0000000000004e20), ThreadProc(5, 1), AnyScope(), true) Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(3, 3, 0x0000000000004e20), ThreadProc(3, 1), AnyScope(), true)
158154
```
159155

160156
Here we can see the `DArray`'s internal representation of the partitions, which
161-
are stored as Dagger `Chunk` objects (which, as a reminder, may reference data
162-
which exists on other Julia workers). One doesn't typically need to worry about
163-
these internal details unless implementing operators on `DArray`s.
157+
are stored as either `EagerThunk` objects (representing an ongoing or completed
158+
computation) or `Chunk` objects (which reference data which exist locally or on
159+
other Julia workers). Of course, one doesn't typically need to worry about
160+
these internal details unless implementing low-level operations on `DArray`s.
164161

165-
Finally, it's all the same to get the result of this complicated set of
166-
broadcast operations; just use `fetch` to get a `DArray`, and `collect` to get
167-
an `Array`:
162+
Finally, it's easy to see the results of this combination of broadcast
163+
operations; just use `collect` to get an `Array`:
168164

169165
```
170-
julia> DA *= 2
171-
Dagger.BCast{Base.Broadcast.Broadcasted{Dagger.DaggerBroadcastStyle, Tuple{Base.OneTo{Int64}, Base.OneTo{Int64}}, typeof(*), Tuple{Dagger.DArray{Any, 2, typeof(cat)}, Int64}}, Any, 2}(100, 100)
172-
173-
julia> fetch(DA)
174-
Dagger.DArray{Any, 2, typeof(cat)}(100, 100)
175-
176-
julia> collect(DA)
166+
julia> collect(DZ)
177167
100×100 Matrix{Float64}:
178-
11.6021 9.12356 0.407394 11.2524 4.89022 … 3.26229 1.23314 1.96686 3.04927 3.65649
179-
3.78571 6.24751 2.74505 8.3009 11.4331 0.336563 9.37329 2.84604 8.52946 10.9168
180-
3.9987 0.641359 3.1918 11.4368 4.41555 1.12344 5.44424 3.49739 3.32251 8.86685
181-
7.90953 1.50281 1.91451 4.89621 9.44033 2.97169 9.68018 11.8686 4.74035 8.49143
182-
1.0611 5.5909 10.364 5.48194 6.821 0.66667 5.33619 5.56166 8.19974 7.02791
183-
7.47418 11.3061 7.9809 2.34617 7.90996 … 6.30402 10.2203 4.92873 8.22024 7.41224
184-
7.06002 0.604601 11.6572 4.95498 0.671179 5.42867 8.19648 0.611793 11.9469 1.6628
185-
2.97898 0.738068 4.44802 5.81322 7.3991 8.71256 2.48281 11.0882 10.9801 11.2464
186-
1.34064 7.37116 1.14921 3.95358 9.73416 7.83354 10.8357 0.270462 9.93926 9.05206
187-
8.77125 0.44711 11.7197 11.6632 8.21711 2.20143 5.06451 3.92386 3.90197 4.32807
188-
10.6201 4.82176 8.4164 10.5457 2.65546 … 10.4681 1.00604 7.05816 6.33214 4.13517
189-
10.6633 10.2059 7.06543 1.58093 5.33819 7.86821 9.56034 2.37929 4.39098 11.6246
190-
11.1778 6.76896 10.249 11.3147 9.7838 6.17893 0.433731 0.713574 9.99747 0.570143
191-
⋮ ⋱ ⋮
192-
6.19119 11.027 10.0742 3.51595 0.48755 3.56015 7.43083 0.624126 9.0292 3.04445
193-
3.38276 5.32876 2.66453 4.08388 6.51538 10.8722 5.14729 3.7499 7.11074 11.3595
194-
4.10258 0.474511 0.852416 4.79806 5.21663 … 9.96304 5.82279 0.818069 9.85573 8.9645
195-
6.03249 8.82392 2.14424 10.7512 8.28873 8.32419 2.96016 4.97967 2.52393 2.31372
196-
7.25826 8.49308 3.90884 3.03783 3.67546 6.63201 5.18839 1.99734 8.51863 8.7656
197-
11.6969 1.29504 0.745432 0.119002 6.11005 5.3909 2.61199 11.5168 8.25466 2.29896
198-
10.7 9.66697 2.34518 6.68043 4.09362 11.6484 2.53879 9.95172 3.97177 9.53493
199-
11.652 3.53655 8.38743 3.75028 11.8518 … 3.11588 1.07276 8.12898 8.80697 1.50331
200-
9.69158 11.2718 8.98014 2.71964 4.11854 0.840723 4.55286 4.47269 8.30213 0.927262
201-
10.5868 11.9395 8.22633 6.71811 9.6942 2.2561 0.233772 1.76577 9.67937 8.29349
202-
9.19925 5.77384 2.18139 10.3563 6.7716 9.8496 11.3777 6.43372 11.2769 4.82911
203-
9.15905 8.12721 11.1374 6.32082 3.49716 7.23124 10.3995 6.98103 7.72209 6.08033
168+
5.72754 1.23614 4.67045 4.89095 3.40126 … 5.07663 1.60482 5.04386 1.44755 2.5682
169+
0.189402 3.64462 5.92218 3.94603 2.32192 1.47115 4.6364 0.778867 3.13838 4.87871
170+
3.3492 3.96929 3.46377 1.29776 3.59547 4.82616 1.1512 3.02528 3.05538 0.139763
171+
5.0981 5.72564 5.1128 0.954708 2.04515 2.50365 5.97576 5.17683 4.79587 1.80113
172+
1.0737 5.25768 4.25363 0.943006 4.25783 4.1801 3.14444 3.07428 4.41075 2.90252
173+
5.48746 5.17286 3.99259 0.939678 3.76034 … 0.00763076 2.98176 1.83674 1.61791 3.33216
174+
1.05088 4.98731 1.24925 3.57909 2.53366 5.96733 2.35186 5.75815 3.32867 1.15317
175+
0.0335647 3.52524 0.159895 5.49908 1.33206 3.51113 0.0753356 1.5557 0.884252 1.45085
176+
5.27506 2.00472 0.00636555 0.461574 5.16735 2.74457 1.14679 2.39407 0.151713 0.85013
177+
4.43607 4.50304 4.73833 1.92498 1.64338 4.34602 4.62612 3.28248 1.32726 5.50207
178+
5.22308 2.53069 1.27758 2.62013 3.73961 … 5.91626 2.54943 5.41472 1.67197 4.09026
179+
1.09684 2.53189 4.23236 0.14055 0.889771 2.20834 2.31341 5.23121 1.74341 4.00588
180+
2.55253 4.1789 3.50287 4.96437 1.26724 3.04302 3.74262 5.46611 1.39375 4.13167
181+
3.03291 4.43932 2.85678 1.59531 0.892166 0.414873 0.643423 4.425 5.48145 5.93383
182+
0.726568 0.516686 3.00791 3.76354 3.32603 2.19812 2.15836 3.85669 3.67233 2.1261
183+
2.22763 1.36281 4.41129 5.29229 1.10093 … 0.45575 4.38389 0.0526105 2.14792 2.26734
184+
2.58065 1.99564 4.82657 0.485823 5.24881 2.16097 3.59942 2.25021 3.96498 0.906153
185+
0.546354 0.982523 1.94377 2.43136 2.77469 4.43507 5.98402 0.692576 1.53298 1.20621
186+
4.71374 4.99402 1.5876 1.81629 2.56269 1.56588 5.42296 0.160867 4.17705 1.13915
187+
2.97733 2.4476 3.82752 1.3491 3.5684 1.23393 1.86595 3.97154 4.6419 4.8964
188+
⋮ ⋱ ⋮
189+
3.49162 2.46081 1.21659 2.96078 4.58102 5.97679 3.34463 0.202255 2.85433 0.0786219
190+
0.894714 2.87079 5.09409 2.2922 3.18928 1.5886 0.163886 5.99251 0.697163 5.75684
191+
2.98867 2.2115 5.07771 0.124194 3.88948 3.61176 0.0732554 4.11606 0.424547 0.621287
192+
5.95438 3.45065 0.194537 3.57519 1.2266 2.93837 1.02609 5.84021 5.498 3.53337
193+
2.234 0.275185 0.648536 0.952341 4.41942 … 4.78238 2.24479 3.31705 5.76518 0.621195
194+
5.54212 2.24089 5.81702 1.96178 4.99409 0.30557 3.55499 0.851678 1.80504 5.81679
195+
5.79409 4.86848 3.10078 4.22252 4.488 3.03427 2.32752 3.54999 0.967972 4.0385
196+
3.06557 5.4993 2.44263 1.82296 0.166883 0.763588 1.59113 4.33305 2.8359 5.56667
197+
3.86797 3.73251 3.14999 4.11437 0.454938 0.166886 0.303827 4.7934 3.37593 2.29402
198+
0.762158 4.3716 0.897798 4.60541 2.96872 … 1.60095 0.480542 1.41945 1.33071 0.308611
199+
1.20503 5.66645 4.03237 3.90194 1.55996 3.58442 4.6735 5.52211 5.46891 2.43612
200+
5.51133 1.13591 3.26696 4.24821 4.60696 3.73251 3.25989 4.735 5.61674 4.32185
201+
2.46529 0.444928 3.85984 5.49469 1.13501 1.36861 5.34651 0.398515 0.239671 5.36412
202+
2.62837 3.99017 4.52569 3.54811 3.35515 4.13514 1.22304 1.01833 3.42534 3.58399
203+
4.88289 5.09945 0.267154 3.38482 4.53408 … 3.71752 5.22216 1.39987 1.38622 5.47351
204+
0.1046 3.65967 1.62098 5.33185 0.0822769 3.30334 5.90173 4.06603 5.00789 4.40601
205+
1.9622 0.755491 2.12264 1.67299 2.34482 4.50632 3.84387 3.22232 5.23164 2.97735
206+
4.37208 5.15253 0.346373 2.98573 5.48589 0.336134 2.25751 2.39057 1.97975 3.24243
207+
3.83293 1.69017 3.00189 1.80388 3.43671 5.94085 1.27609 3.98737 0.334963 5.84865
204208
```
209+
210+
A variety of other operations exist on the `DArray`, and it should generally
211+
behavior otherwise similar to any other `AbstractArray` type. If you find that
212+
it's missing an operation that you need, please file an issue!

docs/src/index.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,44 @@ calls to `hist!` may run in parallel
130130
By using `map` on `temp_bins`, we then make a copy of each worker's bins that
131131
we can safely return back to our current worker, and sum them together to get
132132
our total histogram.
133+
134+
-----
135+
136+
## Quickstart: Distributed Arrays
137+
138+
Dagger's `DArray` type represents a distributed array, where a single large
139+
array is implemented as a set of smaller array partitions, which may be
140+
distributed across a Julia cluster.
141+
142+
For more details: [Distributed Arrays](@ref)
143+
144+
### Distribute an existing array
145+
146+
Distributing any kind of array into a `DArray` is easy, just use `distribute`,
147+
and specify the partitioning you desire with `Blocks`. For example, to
148+
distribute a 16 x 16 matrix in 4 x 4 partitions:
149+
150+
```julia
151+
A = rand(16, 16)
152+
DA = distribute(A, Blocks(4, 4))
153+
```
154+
155+
### Allocate a distributed array directly
156+
157+
To allocate a `DArray`, just pass your `Blocks` partitioning object into the
158+
appropriate allocation function, such as `rand`, `ones`, or `zeros`:
159+
160+
```julia
161+
rand(Blocks(20, 20), 100, 100)
162+
ones(Blocks(20, 100), 100, 2000)
163+
zeros(Blocks(50, 20), 300, 200)
164+
```
165+
166+
### Convert a `DArray` back into an `Array`
167+
168+
To get back an `Array` from a `DArray`, just call `collect`:
169+
170+
```julia
171+
DA = rand(Blocks(32, 32), 256, 128)
172+
collect(DA) # returns a `Matrix{Float64}`
173+
```

src/array/alloc.jl

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mutable struct AllocateArray{T,N} <: ArrayOp{T,N}
88
f::Function
99
domain::ArrayDomain{N}
1010
domainchunks
11+
partitioning::AbstractBlocks
1112
end
1213
size(a::AllocateArray) = size(a.domain)
1314

@@ -18,15 +19,15 @@ function _cumlength(len, step)
1819
cumsum(extra > 0 ? vcat(ps, extra) : ps)
1920
end
2021

21-
function partition(p::Blocks, dom::ArrayDomain)
22+
function partition(p::AbstractBlocks, dom::ArrayDomain)
2223
DomainBlocks(map(first, indexes(dom)),
2324
map(_cumlength, map(length, indexes(dom)), p.blocksize))
2425
end
2526

2627
function stage(ctx, a::AllocateArray)
2728
alloc(idx, sz) = a.f(idx, a.eltype, sz)
2829
thunks = [Dagger.@spawn alloc(i, size(x)) for (i, x) in enumerate(a.domainchunks)]
29-
DArray(a.eltype,a.domain, a.domainchunks, thunks)
30+
return DArray(a.eltype, a.domain, a.domainchunks, thunks, a.partitioning)
3031
end
3132

3233
function Base.rand(p::Blocks, eltype::Type, dims)
@@ -35,7 +36,8 @@ function Base.rand(p::Blocks, eltype::Type, dims)
3536
rand(MersenneTwister(s+idx), x...)
3637
end
3738
d = ArrayDomain(map(x->1:x, dims))
38-
AllocateArray(eltype, f, d, partition(p, d))
39+
a = AllocateArray(eltype, f, d, partition(p, d), p)
40+
return _to_darray(a)
3941
end
4042

4143
Base.rand(p::Blocks, t::Type, dims::Integer...) = rand(p, t, dims)
@@ -48,21 +50,24 @@ function Base.randn(p::Blocks, dims)
4850
randn(MersenneTwister(s+idx), x...)
4951
end
5052
d = ArrayDomain(map(x->1:x, dims))
51-
AllocateArray(Float64, f, d, partition(p, d))
53+
a = AllocateArray(Float64, f, d, partition(p, d), p)
54+
return _to_darray(a)
5255
end
5356
Base.randn(p::Blocks, dims::Integer...) = randn(p, dims)
5457

5558
function Base.ones(p::Blocks, eltype::Type, dims)
5659
d = ArrayDomain(map(x->1:x, dims))
57-
AllocateArray(eltype, (_, x...) -> ones(x...), d, partition(p, d))
60+
a = AllocateArray(eltype, (_, x...) -> ones(x...), d, partition(p, d), p)
61+
return _to_darray(a)
5862
end
5963
Base.ones(p::Blocks, t::Type, dims::Integer...) = ones(p, t, dims)
6064
Base.ones(p::Blocks, dims::Integer...) = ones(p, Float64, dims)
6165
Base.ones(p::Blocks, dims::Tuple) = ones(p, Float64, dims)
6266

6367
function Base.zeros(p::Blocks, eltype::Type, dims)
6468
d = ArrayDomain(map(x->1:x, dims))
65-
AllocateArray(eltype, (_, x...) -> zeros(x...), d, partition(p, d))
69+
a = AllocateArray(eltype, (_, x...) -> zeros(x...), d, partition(p, d), p)
70+
return _to_darray(a)
6671
end
6772
Base.zeros(p::Blocks, t::Type, dims::Integer...) = zeros(p, t, dims)
6873
Base.zeros(p::Blocks, dims::Integer...) = zeros(p, Float64, dims)
@@ -73,7 +78,7 @@ function Base.zero(x::DArray{T,N}) where {T,N}
7378
sd = first(x.subdomains)
7479
part_size = ntuple(i->sd.indexes[i].stop, N)
7580
a = zeros(Blocks(part_size...), T, dims)
76-
return cached_stage(Context(global_context()), a)
81+
return _to_darray(a)
7782
end
7883

7984
function sprand(p::Blocks, m::Integer, n::Integer, sparsity::Real)
@@ -82,13 +87,15 @@ function sprand(p::Blocks, m::Integer, n::Integer, sparsity::Real)
8287
sprand(MersenneTwister(s+idx), sz...,sparsity)
8388
end
8489
d = ArrayDomain((1:m, 1:n))
85-
AllocateArray(Float64, f, d, partition(p, d))
90+
a = AllocateArray(Float64, f, d, partition(p, d), p)
91+
return _to_darray(a)
8692
end
8793

8894
function sprand(p::Blocks, n::Integer, sparsity::Real)
8995
s = rand(UInt)
9096
f = function (idx,t,sz)
9197
sprand(MersenneTwister(s+idx), sz...,sparsity)
9298
end
93-
AllocateArray(Float64, f, d, partition(p, ArrayDomain((1:n,))))
99+
a = AllocateArray(Float64, f, d, partition(p, ArrayDomain((1:n,))), p)
100+
return _to_darray(a)
94101
end

0 commit comments

Comments
 (0)