Skip to content

Commit 70cf00f

Browse files
bors[bot]leios
andauthored
Merge #53
53: Implement async_copy! r=vchuravy a=vchuravy bors r+ Co-authored-by: James Schloss <jrs.schloss@gmail.com>
2 parents 7bfbc8c + 87e4ef4 commit 70cf00f

File tree

4 files changed

+113
-33
lines changed

4 files changed

+113
-33
lines changed

src/KernelAbstractions.jl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ module KernelAbstractions
33
export @kernel
44
export @Const, @localmem, @private, @uniform, @synchronize, @index, groupsize
55
export Device, GPU, CPU, CUDA, Event
6+
export async_copy!
7+
68

79
using MacroTools
810
using StaticArrays
@@ -61,10 +63,12 @@ macro Const end
6163
abstract type Event end
6264
import Base.wait
6365

64-
# TODO
66+
"""
67+
async_copy!(::Device, dest::AbstractArray, src::AbstractArray; dependencies = nothing)
68+
69+
Perform an asynchronous copy on the device. Returns an event that can be waited upon.
70+
"""
6571
function async_copy! end
66-
# function register end
67-
# function unregister end
6872

6973
###
7074
# Kernel language

src/backends/cpu.jl

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,30 @@ function wait(::CPU, ev::CPUEvent, progress=nothing)
1515
else
1616
while !Base.istaskdone(ev.task)
1717
progress()
18-
yield() # yield to the scheduler
18+
end
19+
end
20+
end
21+
function __waitall(::CPU, dependencies, progress)
22+
if dependencies isa Event
23+
dependencies = (dependencies,)
24+
end
25+
if dependencies !== nothing
26+
dependencies = collect(dependencies)
27+
cpudeps = filter(d->d isa CPUEvent && d.task !== nothing, dependencies)
28+
otherdeps = filter(d->!(d isa CPUEvent), dependencies)
29+
Base.sync_end(map(e->e.task, cpudeps))
30+
for event in otherdeps
31+
wait(CPU(), event, progress)
1932
end
2033
end
2134
end
2235

36+
function async_copy!(::CPU, A, B; dependencies=nothing)
37+
__waitall(CPU(), dependencies, yield)
38+
copyto!(A, B)
39+
return CPUEvent(nothing)
40+
end
41+
2342
function (obj::Kernel{CPU})(args...; ndrange=nothing, workgroupsize=nothing, dependencies=nothing)
2443
if ndrange isa Integer
2544
ndrange = (ndrange,)
@@ -47,20 +66,7 @@ end
4766
# Inference barrier
4867
function __run(obj, ndrange, iterspace, args, dependencies)
4968
return Threads.@spawn begin
50-
if dependencies !== nothing
51-
cpu_tasks = Core.Task[]
52-
for event in dependencies
53-
if event isa CPUEvent && event.task isa Core.Task
54-
push!(cpu_tasks, event.task)
55-
end
56-
end
57-
!isempty(cpu_tasks) && Base.sync_end(cpu_tasks)
58-
for event in dependencies
59-
if !(event isa CPUEvent)
60-
wait(CPU(), event, ()->yield())
61-
end
62-
end
63-
end
69+
__waitall(CPU(), dependencies, yield)
6470
@sync begin
6571
# TODO: how do we use the information that the iteration space maps perfectly to
6672
# the ndrange without incurring a 2x compilation overhead

src/backends/cuda.jl

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import CUDAnative, CUDAdrv
22
import CUDAnative: cufunction, DevicePtr
3-
import CUDAdrv: CuEvent, CuStream, CuDefaultStream
3+
import CUDAdrv: CuEvent, CuStream, CuDefaultStream, Mem
44

55
const FREE_STREAMS = CuStream[]
66
const STREAMS = CuStream[]
@@ -76,10 +76,66 @@ wait(::CUDA, ev::CudaEvent, progress=nothing) = __enqueue_wait(ev, CUDAdrv.CuDef
7676
# but which stream would we target?
7777
wait(::CUDA, ev::CPUEvent, progress=nothing) = wait(CPU(), ev, progress)
7878

79-
function __enqueue_wait(ev::CudaEvent, stream::CuStream)
80-
CUDAdrv.wait(ev.event, stream)
79+
function __waitall(::CUDA, dependencies, progress, stream)
80+
if dependencies isa Event
81+
dependencies = (dependencies,)
82+
end
83+
if dependencies !== nothing
84+
dependencies = collect(dependencies)
85+
cudadeps = filter(d->d isa CudaEvent, dependencies)
86+
otherdeps = filter(d->!(d isa CudaEvent), dependencies)
87+
for event in cudadeps
88+
CUDAdrv.wait(event.event, stream)
89+
end
90+
for event in otherdeps
91+
wait(CUDA(), event, progress)
92+
end
93+
end
94+
end
95+
96+
###
97+
# async_copy
98+
###
99+
# - IdDict does not free the memory
100+
# - WeakRef dict does not unique the key by objectid
101+
const __pinned_memory = Dict{UInt64, WeakRef}()
102+
103+
function __pin!(a)
104+
# use pointer instead of objectid?
105+
oid = objectid(a)
106+
if haskey(__pinned_memory, oid) && __pinned_memory[oid].value !== nothing
107+
return nothing
108+
end
109+
ad = Mem.register(Mem.Host, pointer(a), sizeof(a))
110+
finalizer(_ -> Mem.unregister(ad), a)
111+
__pinned_memory[oid] = WeakRef(a)
112+
return nothing
113+
end
114+
115+
function async_copy!(::CUDA, A, B; dependencies=nothing)
116+
A isa Array && __pin!(A)
117+
B isa Array && __pin!(B)
118+
119+
stream = next_stream()
120+
__waitall(CUDA(), dependencies, yield, stream)
121+
event = CuEvent(CUDAdrv.EVENT_DISABLE_TIMING)
122+
GC.@preserve A B begin
123+
destptr = pointer(A)
124+
srcptr = pointer(B)
125+
N = length(A)
126+
unsafe_copyto!(destptr, srcptr, N, async=true, stream=stream)
127+
end
128+
129+
CUDAdrv.record(event, stream)
130+
131+
return CudaEvent(event)
81132
end
82133

134+
135+
136+
###
137+
# Kernel launch
138+
###
83139
function (obj::Kernel{CUDA})(args...; ndrange=nothing, dependencies=nothing, workgroupsize=nothing)
84140
if ndrange isa Integer
85141
ndrange = (ndrange,)
@@ -92,18 +148,7 @@ function (obj::Kernel{CUDA})(args...; ndrange=nothing, dependencies=nothing, wor
92148
end
93149

94150
stream = next_stream()
95-
if dependencies !== nothing
96-
for event in dependencies
97-
if event isa CudaEvent
98-
__enqueue_wait(event, stream)
99-
end
100-
end
101-
for event in dependencies
102-
if !(event isa CudaEvent)
103-
wait(CUDA(), event, ()->yield())
104-
end
105-
end
106-
end
151+
__waitall(CUDA(), dependencies, yield, stream)
107152

108153
if KernelAbstractions.workgroupsize(obj) <: DynamicSize && workgroupsize === nothing
109154
# TODO: allow for NDRange{1, DynamicSize, DynamicSize}(nothing, nothing)

test/async_copy.jl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using KernelAbstractions, Test, CUDAapi
2+
if has_cuda_gpu()
3+
using CuArrays, CUDAdrv
4+
CuArrays.allowscalar(false)
5+
end
6+
7+
function copy_test(backend, ArrayT, M)
8+
A = ArrayT(rand(Float64, M))
9+
B = ArrayT(rand(Float64, M))
10+
11+
a = Array{Float64}(undef, M)
12+
event = async_copy!(backend, a, B, dependencies=Event(CPU()))
13+
event = async_copy!(backend, A, a, dependencies=event)
14+
wait(event)
15+
16+
@test isapprox(a, Array(A))
17+
@test isapprox(a, Array(B))
18+
end
19+
20+
M = 1024
21+
22+
if has_cuda_gpu()
23+
copy_test(CUDA(), CuArray, M)
24+
end
25+
copy_test(CPU(), CuArray, M)

0 commit comments

Comments
 (0)