From 36cbcee501e87df3f9199ee78f89f9d7014fee5d Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Mon, 23 Sep 2024 16:24:08 +0200 Subject: [PATCH 1/4] Add kernelabstractions based mapreduce implementation --- src/host/mapreduce.jl | 186 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 182 insertions(+), 4 deletions(-) diff --git a/src/host/mapreduce.jl b/src/host/mapreduce.jl index ef9f3a31..e843bf13 100644 --- a/src/host/mapreduce.jl +++ b/src/host/mapreduce.jl @@ -2,10 +2,10 @@ const AbstractArrayOrBroadcasted = Union{AbstractArray,Broadcast.Broadcasted} -# GPUArrays' mapreduce methods build on `Base.mapreducedim!`, but with an additional -# argument `init` value to avoid eager initialization of `R` (if set to something). -mapreducedim!(f, op, R::AnyGPUArray, A::AbstractArrayOrBroadcasted; - init=nothing) = error("Not implemented") # COV_EXCL_LINE +# # GPUArrays' mapreduce methods build on `Base.mapreducedim!`, but with an additional +# # argument `init` value to avoid eager initialization of `R` (if set to something). +# mapreducedim!(f, op, R::AnyGPUArray, A::AbstractArrayOrBroadcasted; +# init=nothing) = error("Not implemented") # COV_EXCL_LINE # resolve ambiguities Base.mapreducedim!(f, op, R::AnyGPUArray, A::AbstractArray) = mapreducedim!(f, op, R, A) Base.mapreducedim!(f, op, R::AnyGPUArray, A::Broadcast.Broadcasted) = mapreducedim!(f, op, R, A) @@ -142,3 +142,181 @@ function Base.:(==)(A::AnyGPUArray, B::AnyGPUArray) res = mapreduce(mapper, reducer, A, B; init=(; is_missing=false, is_equal=true)) res.is_missing ? missing : res.is_equal end + +@inline function reduce_group(@context, op, val::T, neutral, ::Val{maxitems}) where {T, maxitems} + items = @groupsize[1] + item = @index(Local, Linear) + + # local mem for a complete reduction + shared = @localmem T (maxitems,) + @inbounds shared[item] = val + + # perform a reduction + d = 1 + while d < items + @synchronize() # legal since cpu=false + index = 2 * d * (item-1) + 1 + @inbounds if index <= items + other_val = if index + d <= items + shared[index+d] + else + neutral + end + shared[index] = op(shared[index], other_val) + end + d *= 2 + end + + # load the final value on the first item + if item == 1 + val = @inbounds shared[item] + end + + return val +end + +Base.@propagate_inbounds _map_getindex(args::Tuple, I) = ((args[1][I]), _map_getindex(Base.tail(args), I)...) +Base.@propagate_inbounds _map_getindex(args::Tuple{Any}, I) = ((args[1][I]),) +Base.@propagate_inbounds _map_getindex(args::Tuple{}, I) = () + +# Reduce an array across the grid. All elements to be processed can be addressed by the +# product of the two iterators `Rreduce` and `Rother`, where the latter iterator will have +# singleton entries for the dimensions that should be reduced (and vice versa). +@kernel cpu=false function partial_mapreduce_device(f, op, neutral, maxitems, Rreduce, Rother, R, As...) + # decompose the 1D hardware indices into separate ones for reduction (across items + # and possibly groups if it doesn't fit) and other elements (remaining groups) + localIdx_reduce = @index(Local, Linear) + localDim_reduce = @groupsize()[1] + groupIdx_reduce, groupIdx_other = fldmod1(@index(Group, Linear), length(Rother)) + numGroups = length(KernelAbstractions.blocks(KernelAbstractions.__iterspace(@context()))) + groupDim_reduce = numGroups ÷ length(Rother) + + # group-based indexing into the values outside of the reduction dimension + # (that means we can safely synchronize items within this group) + iother = groupIdx_other + @inbounds if iother <= length(Rother) + Iother = Rother[iother] + + # load the neutral value + Iout = CartesianIndex(Tuple(Iother)..., groupIdx_reduce) + neutral = if neutral === nothing + R[Iout] + else + neutral + end + + val = op(neutral, neutral) + + # reduce serially across chunks of input vector that don't fit in a group + ireduce = localIdx_reduce + (groupIdx_reduce - 1) * localDim_reduce + while ireduce <= length(Rreduce) + Ireduce = Rreduce[ireduce] + J = max(Iother, Ireduce) + val = op(val, f(_map_getindex(As, J)...)) + ireduce += localDim_reduce * groupDim_reduce + end + + val = reduce_group(@context(), op, val, neutral, maxitems) + + # write back to memory + if localIdx_reduce == 1 + R[Iout] = val + end + end + + return +end + +## COV_EXCL_STOP + +function GPUArrays.mapreducedim!(f::F, op::OP, R::AnyGPUArray{T}, A::AbstractArrayOrBroadcasted; + init=nothing) where {F, OP, T} + Base.check_reducedims(R, A) + length(A) == 0 && return R # isempty(::Broadcasted) iterates + + # add singleton dimensions to the output container, if needed + if ndims(R) < ndims(A) + dims = Base.fill_to_length(size(R), 1, Val(ndims(A))) + R = reshape(R, dims) + end + + # iteration domain, split in two: one part covers the dimensions that should + # be reduced, and the other covers the rest. combining both covers all values. + Rall = CartesianIndices(axes(A)) + Rother = CartesianIndices(axes(R)) + Rreduce = CartesianIndices(ifelse.(axes(A) .== axes(R), Ref(Base.OneTo(1)), axes(A))) + # NOTE: we hard-code `OneTo` (`first.(axes(A))` would work too) or we get a + # CartesianIndices object with UnitRanges that behave badly on the GPU. + @assert length(Rall) == length(Rother) * length(Rreduce) + + # allocate an additional, empty dimension to write the reduced value to. + # this does not affect the actual location in memory of the final values, + # but allows us to write a generalized kernel supporting partial reductions. + R′ = reshape(R, (size(R)..., 1)) + + # how many items do we want? + # + # items in a group work together to reduce values across the reduction dimensions; + # we want as many as possible to improve algorithm efficiency and execution occupancy. + wanted_items = length(Rreduce) + function compute_items(max_items) + if wanted_items > max_items + max_items + else + wanted_items + end + end + + # how many items can we launch? + # + # we might not be able to launch all those items to reduce each slice in one go. + # that's why each items also loops across their inputs, processing multiple values + # so that we can span the entire reduction dimension using a single item group. + + # group size is restricted by local memory + # max_lmem_elements = compute_properties(device()).maxSharedLocalMemory ÷ sizeof(T) + # max_items = min(compute_properties(device()).maxTotalGroupSize, + # compute_items(max_lmem_elements ÷ 2)) + # TODO: dynamic local memory to avoid two compilations + + # let the driver suggest a group size + # args = (f, op, init, Val(max_items), Rreduce, Rother, R′, A) + # kernel_args = kernel_convert.(args) + # kernel_tt = Tuple{Core.Typeof.(kernel_args)...} + # kernel = zefunction(partial_mapreduce_device, kernel_tt) + # reduce_items = launch_configuration(kernel) + reduce_items = 512 + reduce_kernel = partial_mapreduce_device(get_backend(R), (reduce_items,)) + + # how many groups should we launch? + # + # even though we can always reduce each slice in a single item group, that may not be + # optimal as it might not saturate the GPU. we already launch some groups to process + # independent dimensions in parallel; pad that number to ensure full occupancy. + other_groups = length(Rother) + reduce_groups = cld(length(Rreduce), reduce_items) + + # determine the launch configuration + items = reduce_items + groups = reduce_groups*other_groups + + ndrange = groups*items + + # perform the actual reduction + if reduce_groups == 1 + # we can cover the dimensions to reduce using a single group + reduce_kernel(f, op, init, Val(items), Rreduce, Rother, R′, A; ndrange) + else + # we need multiple steps to cover all values to reduce + partial = similar(R, (size(R)..., reduce_groups)) + if init === nothing + # without an explicit initializer we need to copy from the output container + partial .= R + end + reduce_kernel(f, op, init, Val(items), Rreduce, Rother, partial, A; ndrange) + + GPUArrays.mapreducedim!(identity, op, R′, partial; init=init) + end + + return R +end From d7f487bb4897a69fcec448007f66f6bf7c0a4fb3 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Tue, 24 Sep 2024 10:33:42 +0200 Subject: [PATCH 2/4] fix syntax --- src/host/mapreduce.jl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/host/mapreduce.jl b/src/host/mapreduce.jl index e843bf13..729fec9f 100644 --- a/src/host/mapreduce.jl +++ b/src/host/mapreduce.jl @@ -143,8 +143,11 @@ function Base.:(==)(A::AnyGPUArray, B::AnyGPUArray) res.is_missing ? missing : res.is_equal end + +import KernelAbstractions: @context + @inline function reduce_group(@context, op, val::T, neutral, ::Val{maxitems}) where {T, maxitems} - items = @groupsize[1] + items = @groupsize()[1] item = @index(Local, Linear) # local mem for a complete reduction @@ -223,13 +226,11 @@ Base.@propagate_inbounds _map_getindex(args::Tuple{}, I) = () R[Iout] = val end end - - return end ## COV_EXCL_STOP -function GPUArrays.mapreducedim!(f::F, op::OP, R::AnyGPUArray{T}, A::AbstractArrayOrBroadcasted; +function mapreducedim!(f::F, op::OP, R::AnyGPUArray{T}, A::AbstractArrayOrBroadcasted; init=nothing) where {F, OP, T} Base.check_reducedims(R, A) length(A) == 0 && return R # isempty(::Broadcasted) iterates From 2314e247693b6ae94166de9b587a0c82c8c25956 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Tue, 24 Sep 2024 13:23:15 +0200 Subject: [PATCH 3/4] Use JLArray to test --- lib/JLArrays/src/JLArrays.jl | 9 --------- 1 file changed, 9 deletions(-) diff --git a/lib/JLArrays/src/JLArrays.jl b/lib/JLArrays/src/JLArrays.jl index dbec3d25..2ccaea01 100644 --- a/lib/JLArrays/src/JLArrays.jl +++ b/lib/JLArrays/src/JLArrays.jl @@ -332,15 +332,6 @@ end Adapt.adapt_storage(::Adaptor, x::JLArray{T,N}) where {T,N} = JLDeviceArray{T,N}(x.data[], x.offset, x.dims) -function GPUArrays.mapreducedim!(f, op, R::AnyJLArray, A::Union{AbstractArray,Broadcast.Broadcasted}; - init=nothing) - if init !== nothing - fill!(R, init) - end - @allowscalar Base.reducedim!(op, typed_data(R), map(f, A)) - R -end - ## KernelAbstractions interface KernelAbstractions.get_backend(a::JLA) where JLA <: JLArray = JLBackend() From d103ed5d92b4220b94d850722ad0ccef9685302c Mon Sep 17 00:00:00 2001 From: Christian Guinard <28689358+christiangnrd@users.noreply.github.com> Date: Sat, 28 Jun 2025 21:12:50 -0300 Subject: [PATCH 4/4] Implement some feedback --- src/host/mapreduce.jl | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/host/mapreduce.jl b/src/host/mapreduce.jl index 729fec9f..b749ec08 100644 --- a/src/host/mapreduce.jl +++ b/src/host/mapreduce.jl @@ -185,7 +185,7 @@ Base.@propagate_inbounds _map_getindex(args::Tuple{}, I) = () # Reduce an array across the grid. All elements to be processed can be addressed by the # product of the two iterators `Rreduce` and `Rother`, where the latter iterator will have # singleton entries for the dimensions that should be reduced (and vice versa). -@kernel cpu=false function partial_mapreduce_device(f, op, neutral, maxitems, Rreduce, Rother, R, As...) +@kernel cpu=false unsafe_indices=true function partial_mapreduce_device(f, op, neutral, maxitems, Rreduce, Rother, R, As...) # decompose the 1D hardware indices into separate ones for reduction (across items # and possibly groups if it doesn't fit) and other elements (remaining groups) localIdx_reduce = @index(Local, Linear) @@ -250,11 +250,6 @@ function mapreducedim!(f::F, op::OP, R::AnyGPUArray{T}, A::AbstractArrayOrBroadc # CartesianIndices object with UnitRanges that behave badly on the GPU. @assert length(Rall) == length(Rother) * length(Rreduce) - # allocate an additional, empty dimension to write the reduced value to. - # this does not affect the actual location in memory of the final values, - # but allows us to write a generalized kernel supporting partial reductions. - R′ = reshape(R, (size(R)..., 1)) - # how many items do we want? # # items in a group work together to reduce values across the reduction dimensions; @@ -286,7 +281,7 @@ function mapreducedim!(f::F, op::OP, R::AnyGPUArray{T}, A::AbstractArrayOrBroadc # kernel_tt = Tuple{Core.Typeof.(kernel_args)...} # kernel = zefunction(partial_mapreduce_device, kernel_tt) # reduce_items = launch_configuration(kernel) - reduce_items = 512 + reduce_items = compute_items(512) reduce_kernel = partial_mapreduce_device(get_backend(R), (reduce_items,)) # how many groups should we launch? @@ -306,7 +301,7 @@ function mapreducedim!(f::F, op::OP, R::AnyGPUArray{T}, A::AbstractArrayOrBroadc # perform the actual reduction if reduce_groups == 1 # we can cover the dimensions to reduce using a single group - reduce_kernel(f, op, init, Val(items), Rreduce, Rother, R′, A; ndrange) + reduce_kernel(f, op, init, Val(items), Rreduce, Rother, R, A; ndrange) else # we need multiple steps to cover all values to reduce partial = similar(R, (size(R)..., reduce_groups)) @@ -316,7 +311,7 @@ function mapreducedim!(f::F, op::OP, R::AnyGPUArray{T}, A::AbstractArrayOrBroadc end reduce_kernel(f, op, init, Val(items), Rreduce, Rother, partial, A; ndrange) - GPUArrays.mapreducedim!(identity, op, R′, partial; init=init) + GPUArrays.mapreducedim!(identity, op, R, partial; init=init) end return R