Skip to content

Commit 06052c1

Browse files
add mapped dataloader fetch method
1 parent 7de6ebe commit 06052c1

File tree

5 files changed

+225
-38
lines changed

5 files changed

+225
-38
lines changed

.gitignore

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,29 @@
1+
# If you prefer the allow list template instead of the deny list, see community template:
2+
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
3+
#
4+
# Binaries for programs and plugins
5+
*.exe
6+
*.exe~
7+
*.dll
8+
*.so
9+
*.dylib
10+
11+
# Test binary, built with `go test -c`
12+
*.test
13+
14+
# Output of the go coverage tool, specifically when used with LiteIDE
15+
*.out
16+
17+
# Dependency directories (remove the comment below to include it)
18+
# vendor/
19+
20+
# Go workspace file
21+
# go.work
22+
go.work.sum
23+
24+
# env file
25+
.env
26+
27+
# Editors config
128
.vscode
29+
.idea

README.md

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66

77
It works as follows:
88
* A Loader object is created per graphql request.
9-
* Each of many concurrently executing graphql resolver functions call Load() on the Loader object with different keys. Let's say K1, K2, K3
10-
* Each call to Load() with a new key is delayed slightly (a few milliseconds) so that the Loader can load them together.
11-
* The customizable fetch function of the loader takes a list of keys and loads data for all of them in a single batched request to the data storage layer. It might send `[K1,K2,K3]` and get back `[V1,V2,V3]`.
9+
* Each of many concurrently executing graphql resolver functions call `Load()` on the Loader object with different keys. Let's say `K1`, `K2`, `K3`
10+
* Each call to `Load()` with a new key is delayed slightly (a few milliseconds) so that the Loader can load them together.
11+
* The customizable `fetch` function of the loader takes a list of keys and loads data for all of them in a single batched request to the data storage layer. It might send `[K1,K2,K3]` and get back `[V1,V2,V3]`. Alternatively, a mappedFetch function of the loader takes a list of keys and returns a map instead of a list. It might send `[K1, K2, K3]` and get back `{K1: V1, K2: V2, K3: V3}`.
1212
* The Loader takes case of sending the right result to the right caller and the result is cached for the duration of the graphql request.
1313

14+
> [!NOTE]
15+
> The `fetch` method expect the returned list to correspond to the provided keys in the same order. Alternatively the `mappedFetch` can be used which allows returning a map, ensuring correct ordering and easy handling of nil values.
16+
1417
Usage:
1518

1619
```sh
@@ -25,26 +28,37 @@ import (
2528
"context"
2629
"fmt"
2730
"strconv"
28-
"time"
2931

3032
"github.com/vikstrous/dataloadgen"
3133
)
3234

33-
// fetchFn is shown as a function here, but it might work better as a method
35+
// fetchFn/mappedFetchFn is shown as a function here, but it might work better as a method
3436
// ctx is the context from the first call to Load for the current batch
3537
func fetchFn(ctx context.Context, keys []string) (ret []int, errs []error) {
3638
for _, key := range keys {
3739
num, err := strconv.ParseInt(key, 10, 32)
3840
ret = append(ret, int(num))
3941
errs = append(errs, err)
4042
}
41-
return
43+
return ret, errs
44+
}
45+
func mappedFetchFn(ctx context.Context, keys []string) (ret map[string]int, errs map[string]error) {
46+
for _, key := range keys {
47+
num, err := strconv.ParseInt(key, 10, 32)
48+
ret[key] = int(num)
49+
errs[key] = err
50+
}
51+
return ret, errs
4252
}
4353

4454
func main() {
4555
ctx := context.Background()
46-
// Per-request setup code:
56+
57+
// Per-request setup code. Either:
4758
loader := dataloadgen.NewLoader(fetchFn)
59+
// or
60+
loader := dataloadgen.NewMappedLoader(mappedFetchFn)
61+
4862
// In every graphql resolver:
4963
result, err := loader.Load(ctx, "1")
5064
if err != nil {

benchmark/benchmarks_test.go

Lines changed: 93 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,25 @@ func newVikstrous() *dataloadgen.Loader[int, benchmark.User] {
8989
)
9090
}
9191

92+
func newVikstrousMapped() *dataloadgen.Loader[int, benchmark.User] {
93+
return dataloadgen.NewMappedLoader(func(_ context.Context, keys []int) (map[int]benchmark.User, map[int]error) {
94+
users := make(map[int]benchmark.User, len(keys))
95+
errors := make(map[int]error, len(keys))
96+
97+
for _, key := range keys {
98+
if key%100 == 1 {
99+
errors[key] = fmt.Errorf("user not found")
100+
} else {
101+
users[key] = benchmark.User{ID: strconv.Itoa(key), Name: "user " + strconv.Itoa(key)}
102+
}
103+
}
104+
return users, errors
105+
},
106+
dataloadgen.WithBatchCapacity(100),
107+
dataloadgen.WithWait(500*time.Nanosecond),
108+
)
109+
}
110+
92111
func BenchmarkAll(b *testing.B) {
93112
ctx := context.Background()
94113

@@ -113,6 +132,11 @@ func BenchmarkAll(b *testing.B) {
113132
newVikstrous()
114133
}
115134
})
135+
b.Run("dataloadgen_mapped", func(b *testing.B) {
136+
for i := 0; i < b.N; i++ {
137+
newVikstrousMapped()
138+
}
139+
})
116140
})
117141

118142
b.Run("cached", func(b *testing.B) {
@@ -124,7 +148,7 @@ func BenchmarkAll(b *testing.B) {
124148
thunks[i] = dataloaderDL.Load(ctx, 1)
125149
}
126150
for i := 0; i < 100; i++ {
127-
thunks[i]()
151+
_, _ = thunks[i]()
128152
}
129153
}
130154
})
@@ -137,7 +161,7 @@ func BenchmarkAll(b *testing.B) {
137161
}
138162

139163
for i := 0; i < 100; i++ {
140-
thunks[i]()
164+
_, _ = thunks[i]()
141165
}
142166
}
143167
})
@@ -150,7 +174,7 @@ func BenchmarkAll(b *testing.B) {
150174
}
151175

152176
for i := 0; i < 100; i++ {
153-
thunks[i].Get(ctx)
177+
_, _ = thunks[i].Get(ctx)
154178
}
155179
}
156180
})
@@ -163,7 +187,20 @@ func BenchmarkAll(b *testing.B) {
163187
}
164188

165189
for i := 0; i < 100; i++ {
166-
thunks[i]()
190+
_, _ = thunks[i]()
191+
}
192+
}
193+
})
194+
b.Run("dataloadgen_mapped", func(b *testing.B) {
195+
for i := 0; i < b.N; i++ {
196+
vikstrousDL := newVikstrousMapped()
197+
thunks := make([]func() (benchmark.User, error), 100)
198+
for i := 0; i < 100; i++ {
199+
thunks[i] = vikstrousDL.LoadThunk(ctx, 1)
200+
}
201+
202+
for i := 0; i < 100; i++ {
203+
_, _ = thunks[i]()
167204
}
168205
}
169206
})
@@ -178,7 +215,7 @@ func BenchmarkAll(b *testing.B) {
178215
thunks[i] = dataloaderDL.Load(ctx, i)
179216
}
180217
for i := 0; i < 100; i++ {
181-
thunks[i]()
218+
_, _ = thunks[i]()
182219
}
183220
}
184221
})
@@ -191,7 +228,7 @@ func BenchmarkAll(b *testing.B) {
191228
}
192229

193230
for i := 0; i < 100; i++ {
194-
thunks[i]()
231+
_, _ = thunks[i]()
195232
}
196233
}
197234
})
@@ -204,7 +241,7 @@ func BenchmarkAll(b *testing.B) {
204241
}
205242

206243
for i := 0; i < 100; i++ {
207-
thunks[i].Get(ctx)
244+
_, _ = thunks[i].Get(ctx)
208245
}
209246
}
210247
})
@@ -217,7 +254,20 @@ func BenchmarkAll(b *testing.B) {
217254
}
218255

219256
for i := 0; i < 100; i++ {
220-
thunks[i]()
257+
_, _ = thunks[i]()
258+
}
259+
}
260+
})
261+
b.Run("dataloadgen_mapped", func(b *testing.B) {
262+
for i := 0; i < b.N; i++ {
263+
vikstrousDL := newVikstrousMapped()
264+
thunks := make([]func() (benchmark.User, error), 100)
265+
for i := 0; i < 100; i++ {
266+
thunks[i] = vikstrousDL.LoadThunk(ctx, i)
267+
}
268+
269+
for i := 0; i < 100; i++ {
270+
_, _ = thunks[i]()
221271
}
222272
}
223273
})
@@ -295,11 +345,29 @@ func BenchmarkAll(b *testing.B) {
295345
wg.Wait()
296346
}
297347
})
348+
b.Run("dataloadgen_mapped", func(b *testing.B) {
349+
for n := 0; n < b.N*10; n++ {
350+
vikstrousDL := newVikstrousMapped()
351+
results := make([]benchmark.User, 10)
352+
var wg sync.WaitGroup
353+
for i := 0; i < 10; i++ {
354+
wg.Add(1)
355+
go func(i int) {
356+
for j := 0; j < b.N; j++ {
357+
u, _ := vikstrousDL.Load(ctx, i)
358+
results[i] = u
359+
}
360+
wg.Done()
361+
}(i)
362+
}
363+
wg.Wait()
364+
}
365+
})
298366
})
299367

300368
b.Run("all in one request", func(b *testing.B) {
301369
b.Run("dataloader", func(b *testing.B) {
302-
keys := []int{}
370+
var keys []int
303371
for n := 0; n < 10000; n++ {
304372
keys = append(keys, n)
305373
}
@@ -310,7 +378,7 @@ func BenchmarkAll(b *testing.B) {
310378
}
311379
})
312380
b.Run("dataloaden", func(b *testing.B) {
313-
keys := []int{}
381+
var keys []int
314382
for n := 0; n < 10000; n++ {
315383
keys = append(keys, n)
316384
}
@@ -321,7 +389,7 @@ func BenchmarkAll(b *testing.B) {
321389
}
322390
})
323391
b.Run("yckao_dataloader", func(b *testing.B) {
324-
keys := []int{}
392+
var keys []int
325393
for n := 0; n < 10000; n++ {
326394
keys = append(keys, n)
327395
}
@@ -330,19 +398,30 @@ func BenchmarkAll(b *testing.B) {
330398
yckaoDL := newYckao()
331399
thunks := yckaoDL.LoadMany(ctx, keys)
332400
for _, t := range thunks {
333-
t.Get(ctx)
401+
_, _ = t.Get(ctx)
334402
}
335403
}
336404
})
337405
b.Run("dataloadgen", func(b *testing.B) {
338-
keys := []int{}
406+
var keys []int
339407
for n := 0; n < 10000; n++ {
340408
keys = append(keys, n)
341409
}
342410
b.ResetTimer()
343411
for i := 0; i < b.N; i++ {
344412
vikstrousDL := newVikstrous()
345-
vikstrousDL.LoadAll(ctx, keys)
413+
_, _ = vikstrousDL.LoadAll(ctx, keys)
414+
}
415+
})
416+
b.Run("dataloadgen_mapped", func(b *testing.B) {
417+
var keys []int
418+
for n := 0; n < 10000; n++ {
419+
keys = append(keys, n)
420+
}
421+
b.ResetTimer()
422+
for i := 0; i < b.N; i++ {
423+
vikstrousDL := newVikstrousMapped()
424+
_, _ = vikstrousDL.LoadAll(ctx, keys)
346425
}
347426
})
348427
})

dataloadgen.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"sync"
88
"time"
99

10-
trace "go.opentelemetry.io/otel/trace"
10+
"go.opentelemetry.io/otel/trace"
1111
)
1212

1313
// Option allows for configuration of loader fields.
@@ -51,6 +51,25 @@ func NewLoader[KeyT comparable, ValueT any](fetch func(ctx context.Context, keys
5151
return l
5252
}
5353

54+
// NewMappedLoader creates a new GenericLoader given a mappedFetch, wait and maxBatch
55+
func NewMappedLoader[KeyT comparable, ValueT any](mappedFetch func(ctx context.Context, keys []KeyT) (map[KeyT]ValueT, map[KeyT]error), options ...Option) *Loader[KeyT, ValueT] {
56+
return NewLoader(convertMappedFetch(mappedFetch), options...)
57+
}
58+
59+
// convertMappedFetch accepts a fetcher method that returns maps, and converts it to a fetcher that returns lists.
60+
func convertMappedFetch[KeyT comparable, ValueT any](mappedFetch func(ctx context.Context, keys []KeyT) (map[KeyT]ValueT, map[KeyT]error)) func(ctx context.Context, keys []KeyT) ([]ValueT, []error) {
61+
return func(ctx context.Context, keys []KeyT) ([]ValueT, []error) {
62+
mappedResults, mappedErrs := mappedFetch(ctx, keys)
63+
var values = make([]ValueT, len(keys))
64+
var errs = make([]error, len(keys))
65+
for i, key := range keys {
66+
values[i] = mappedResults[key]
67+
errs[i] = mappedErrs[key]
68+
}
69+
return values, errs
70+
}
71+
}
72+
5473
type loaderConfig struct {
5574
// how long to done before sending a batch
5675
wait time.Duration
@@ -177,18 +196,18 @@ func (l *Loader[KeyT, ValueT]) LoadAll(ctx context.Context, keys []KeyT) ([]Valu
177196
}
178197

179198
values := make([]ValueT, len(keys))
180-
errors := make([]error, len(keys))
199+
errs := make([]error, len(keys))
181200
allNil := true
182201
for i, thunk := range thunks {
183-
values[i], errors[i] = thunk()
184-
if errors[i] != nil {
202+
values[i], errs[i] = thunk()
203+
if errs[i] != nil {
185204
allNil = false
186205
}
187206
}
188207
if allNil {
189208
return values, nil
190209
}
191-
return values, ErrorSlice(errors)
210+
return values, ErrorSlice(errs)
192211
}
193212

194213
// LoadAllThunk returns a function that when called will block waiting for a ValueT.
@@ -201,18 +220,18 @@ func (l *Loader[KeyT, ValueT]) LoadAllThunk(ctx context.Context, keys []KeyT) fu
201220
}
202221
return func() ([]ValueT, error) {
203222
values := make([]ValueT, len(keys))
204-
errors := make([]error, len(keys))
223+
errs := make([]error, len(keys))
205224
allNil := true
206225
for i, thunk := range thunks {
207-
values[i], errors[i] = thunk()
208-
if allNil && errors[i] != nil {
226+
values[i], errs[i] = thunk()
227+
if allNil && errs[i] != nil {
209228
allNil = false
210229
}
211230
}
212231
if allNil {
213232
return values, nil
214233
}
215-
return values, ErrorSlice(errors)
234+
return values, ErrorSlice(errs)
216235
}
217236
}
218237

@@ -296,7 +315,7 @@ func (l *Loader[KeyT, ValueT]) safeFetch(ctx context.Context, keys []KeyT) (valu
296315
return l.fetch(ctx, keys)
297316
}
298317

299-
// addKeyToBatch will return the location of the key in the batch, if its not found
318+
// addKeyToBatch will return the location of the key in the batch, if it's not found
300319
// it will add the key to the batch
301320
func (l *Loader[KeyT, ValueT]) addKeyToBatch(b *loaderBatch[KeyT, ValueT], key KeyT) int {
302321
pos := len(b.keys)

0 commit comments

Comments
 (0)