Skip to content

Commit e9a921d

Browse files
authored
perf(core): Implement parallel merge sorted algo (#9429)
``` goos: linux goarch: amd64 pkg: github.com/hypermodeinc/dgraph/v25/algo cpu: 11th Gen Intel(R) Core(TM) i7-1185G7 @ 3.00GHz BenchmarkMergeSorted/MergeSorted-8 4 349424640 ns/op 45692916 B/op 20059 allocs/op BenchmarkMergeSorted/MergeSortedParallel-8 9 173713513 ns/op 124697028 B/op 22970 allocs/op BenchmarkMergeSorted/MergeSortedNew-8 8 131558511 ns/op 52276942 B/op 20236 allocs/op ```
1 parent ae33cee commit e9a921d

File tree

2 files changed

+208
-23
lines changed

2 files changed

+208
-23
lines changed

algo/uidlist.go

Lines changed: 154 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package algo
88
import (
99
"container/heap"
1010
"sort"
11+
"sync"
1112

1213
"github.com/hypermodeinc/dgraph/v25/codec"
1314
"github.com/hypermodeinc/dgraph/v25/protos/pb"
@@ -360,41 +361,61 @@ func Difference(u, v *pb.List) *pb.List {
360361
return &pb.List{Uids: out}
361362
}
362363

363-
// MergeSorted merges sorted lists.
364-
func MergeSorted(lists []*pb.List) *pb.List {
364+
func MergeSortedMoreMem(lists []*pb.List) *pb.List {
365+
numThreads := 10
366+
if len(lists) > numThreads*numThreads {
367+
k := numThreads
368+
res := []*pb.List{}
369+
var wg sync.WaitGroup
370+
var mutex sync.Mutex
371+
wg.Add(k)
372+
for i := 0; i < k; i++ {
373+
go func() {
374+
defer wg.Done()
375+
end := (i + 1) * len(lists) / k
376+
if end > len(lists) {
377+
end = len(lists)
378+
}
379+
result := MergeSortedMoreMem(lists[i*len(lists)/k : end])
380+
mutex.Lock()
381+
res = append(res, result)
382+
mutex.Unlock()
383+
}()
384+
}
385+
wg.Wait()
386+
return MergeSortedMoreMem(res)
387+
} else {
388+
return internalMergeSort(lists)
389+
}
390+
}
391+
392+
func internalMergeSortWithBuffer(lists []*pb.List, buffer []uint64) *pb.List {
365393
if len(lists) == 0 {
366-
return new(pb.List)
394+
return &pb.List{Uids: buffer[:0]}
367395
}
368396

369397
h := &uint64Heap{}
370398
heap.Init(h)
371-
maxSz := 0
372399

373400
for i, l := range lists {
374-
if l == nil {
401+
if l == nil || len(l.Uids) == 0 {
375402
continue
376403
}
377-
lenList := len(l.Uids)
378-
if lenList > 0 {
379-
heap.Push(h, elem{
380-
val: l.Uids[0],
381-
listIdx: i,
382-
})
383-
if lenList > maxSz {
384-
maxSz = lenList
385-
}
386-
}
404+
heap.Push(h, elem{
405+
val: l.Uids[0],
406+
listIdx: i,
407+
})
387408
}
388409

389-
// Our final output. Give it an approximate capacity as copies are expensive.
390-
output := make([]uint64, 0, maxSz)
391-
// idx[i] is the element we are looking at for lists[i].
410+
// Use the provided buffer
411+
output := buffer[:0]
392412
idx := make([]int, len(lists))
393-
var last uint64 // Last element added to sorted / final output.
394-
for h.Len() > 0 { // While heap is not empty.
395-
me := (*h)[0] // Peek at the top element in heap.
413+
var last uint64
414+
415+
for h.Len() > 0 {
416+
me := (*h)[0]
396417
if len(output) == 0 || me.val != last {
397-
output = append(output, me.val) // Add if unique.
418+
output = append(output, me.val)
398419
last = me.val
399420
}
400421
l := lists[me.listIdx]
@@ -404,12 +425,122 @@ func MergeSorted(lists []*pb.List) *pb.List {
404425
idx[me.listIdx]++
405426
val := l.Uids[idx[me.listIdx]]
406427
(*h)[0].val = val
407-
heap.Fix(h, 0) // Faster than Pop() followed by Push().
428+
heap.Fix(h, 0)
408429
}
409430
}
431+
410432
return &pb.List{Uids: output}
411433
}
412434

435+
// MergeSorted merges sorted lists.
436+
func internalMergeSort(lists []*pb.List) *pb.List {
437+
sz := 0
438+
for _, l := range lists {
439+
if l == nil || len(l.Uids) == 0 {
440+
continue
441+
}
442+
sz += len(l.Uids)
443+
}
444+
buffer := make([]uint64, 0, sz)
445+
return internalMergeSortWithBuffer(lists, buffer)
446+
}
447+
448+
func MergeSorted(lists []*pb.List) *pb.List {
449+
// Calculate total capacity needed
450+
totalCap := 0
451+
for _, l := range lists {
452+
if l != nil {
453+
totalCap += len(l.Uids)
454+
}
455+
}
456+
457+
// Pre-allocate one big buffer
458+
bigBuffer := make([]uint64, totalCap)
459+
return mergeSortedWithBuffer(lists, bigBuffer)
460+
}
461+
462+
const numThreads = 10
463+
const numListPerThread = 10
464+
465+
func mergeSortedWithBuffer(lists []*pb.List, buffer []uint64) *pb.List {
466+
if len(lists) < numThreads*numListPerThread {
467+
return internalMergeSort(lists)
468+
}
469+
470+
// Calculate how much buffer each goroutine needs
471+
chunkSizes := make([]int, numThreads)
472+
totalNeeded := 0
473+
474+
chunkSize := (len(lists) + numThreads - 1) / numThreads
475+
476+
for i := 0; i < numThreads; i++ {
477+
start := i * chunkSize
478+
end := (i + 1) * chunkSize
479+
if end > len(lists) {
480+
end = len(lists)
481+
}
482+
483+
if start > len(lists) {
484+
continue
485+
}
486+
487+
chunkCap := 0
488+
for j := start; j < end; j++ {
489+
if lists[j] != nil {
490+
chunkCap += len(lists[j].Uids)
491+
}
492+
}
493+
chunkSizes[i] = chunkCap
494+
totalNeeded += chunkCap
495+
}
496+
497+
// Calculate buffer offsets for each goroutine
498+
bufferOffsets := make([]int, numThreads)
499+
bufferOffsets[0] = 0
500+
for i := 1; i < numThreads; i++ {
501+
bufferOffsets[i] = bufferOffsets[i-1] + chunkSizes[i-1]
502+
}
503+
504+
// Distribute buffer slices to each goroutine
505+
intermediateResults := make([]*pb.List, numThreads)
506+
var wg sync.WaitGroup
507+
508+
wg.Add(numThreads)
509+
for i := 0; i < numThreads; i++ {
510+
go func(idx int) {
511+
defer wg.Done()
512+
start := idx * chunkSize
513+
end := (idx + 1) * chunkSize
514+
if end > len(lists) {
515+
end = len(lists)
516+
}
517+
if start > len(lists) {
518+
return
519+
}
520+
521+
// Give this goroutine its slice of the big buffer
522+
bufferStart := bufferOffsets[idx]
523+
bufferEnd := bufferStart + chunkSizes[idx]
524+
goroutineBuffer := buffer[bufferStart:bufferEnd:bufferEnd][:0]
525+
result := internalMergeSortWithBuffer(lists[start:end], goroutineBuffer)
526+
intermediateResults[idx] = result
527+
}(i)
528+
}
529+
wg.Wait()
530+
531+
// Filter out nil results
532+
validResults := make([]*pb.List, 0, numThreads)
533+
for _, result := range intermediateResults {
534+
if result != nil && len(result.Uids) > 0 {
535+
validResults = append(validResults, result)
536+
}
537+
}
538+
539+
// Use the remaining part of buffer for final merge
540+
finalBuffer := make([]uint64, 0, totalNeeded)
541+
return internalMergeSortWithBuffer(validResults, finalBuffer)
542+
}
543+
413544
// IndexOf performs a binary search on the uids slice and returns the index at
414545
// which it finds the uid, else returns -1
415546
func IndexOf(u *pb.List, uid uint64) int {

algo/uidlist_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,60 @@ func TestMergeSorted6(t *testing.T) {
7272
[]uint64{1, 2, 11, 12, 13, 14, 15, 16, 17, 18, 20, 25})
7373
}
7474

75+
func BenchmarkMergeSorted(b *testing.B) {
76+
createList := func(n int) *pb.List {
77+
list := make([]uint64, n)
78+
for i := range list {
79+
list[i] = uint64(rand.Int63())
80+
}
81+
sort.Slice(list, func(i, j int) bool {
82+
return list[i] < list[j]
83+
})
84+
return &pb.List{Uids: list}
85+
}
86+
87+
input := []*pb.List{}
88+
for i := 0; i < 10000; i++ {
89+
input = append(input, createList(100))
90+
}
91+
92+
b.Run("MergeSorted", func(b *testing.B) {
93+
for i := 0; i < b.N; i++ {
94+
internalMergeSort(input)
95+
}
96+
})
97+
b.Run("MergeSortedParallel", func(b *testing.B) {
98+
for i := 0; i < b.N; i++ {
99+
MergeSortedMoreMem(input)
100+
}
101+
})
102+
b.Run("MergeSortedNew", func(b *testing.B) {
103+
for i := 0; i < b.N; i++ {
104+
MergeSorted(input)
105+
}
106+
})
107+
}
108+
109+
func TestMergeSortedRandom(t *testing.T) {
110+
createList := func(n int) *pb.List {
111+
list := make([]uint64, n)
112+
for i := range list {
113+
list[i] = uint64(rand.Int63())
114+
}
115+
sort.Slice(list, func(i, j int) bool {
116+
return list[i] < list[j]
117+
})
118+
return &pb.List{Uids: list}
119+
}
120+
121+
input := []*pb.List{}
122+
for i := 0; i < 1000; i++ {
123+
input = append(input, createList(1000))
124+
}
125+
126+
require.Equal(t, MergeSorted(input).Uids, internalMergeSort(input).Uids)
127+
}
128+
75129
func TestMergeSorted7(t *testing.T) {
76130
input := []*pb.List{
77131
newList([]uint64{5, 6, 7}),

0 commit comments

Comments
 (0)