@@ -13,12 +13,7 @@ import (
13
13
"go.opentelemetry.io/otel/trace/noop"
14
14
)
15
15
16
- // Loader is the function type for loading data
17
- type Loader [K comparable , V any ] func (context.Context , []K ) []Result [V ]
18
-
19
- // Interface is a `DataLoader` Interface which defines a public API for loading data from a particular
20
- // data back-end with unique keys such as the `id` column of a SQL table or
21
- // document name in a MongoDB database, given a batch loading function.
16
+ // Interface defines a public API for loading data from a particular data source
22
17
type Interface [K comparable , V any ] interface {
23
18
// Load loads a single key
24
19
Load (context.Context , K ) Result [V ]
@@ -34,6 +29,9 @@ type Interface[K comparable, V any] interface {
34
29
Prime (ctx context.Context , key K , value V ) Interface [K , V ]
35
30
}
36
31
32
+ // Loader is the function type for loading data
33
+ type Loader [K comparable , V any ] func (context.Context , []K ) []Result [V ]
34
+
37
35
// config holds the configuration for DataLoader
38
36
type config struct {
39
37
// BatchSize is the number of keys to batch together, Default is 100
@@ -68,7 +66,6 @@ func New[K comparable, V any](loader Loader[K, V], options ...Option) Interface[
68
66
CacheSize : 1024 ,
69
67
CacheExpire : time .Minute ,
70
68
}
71
-
72
69
for _ , option := range options {
73
70
option (& config )
74
71
}
@@ -79,12 +76,10 @@ func New[K comparable, V any](loader Loader[K, V], options ...Option) Interface[
79
76
stopSchedule : make (chan struct {}),
80
77
}
81
78
dl .reset ()
82
-
83
79
// Create a cache if the cache size is greater than 0
84
80
if config .CacheSize > 0 {
85
81
dl .cache = expirable .NewLRU [K , V ](config .CacheSize , nil , config .CacheExpire )
86
82
}
87
-
88
83
return dl
89
84
}
90
85
@@ -197,7 +192,6 @@ func (d *dataLoader[K, V]) goLoad(ctx context.Context, key K) <-chan Result[V] {
197
192
// spawn a new goroutine to process the batch
198
193
go d .processBatch (ctx , d .batch , d .batchCtx , d .chs )
199
194
close (d .stopSchedule )
200
- // Create a new batch, and a new set of channels
201
195
d .reset ()
202
196
}
203
197
@@ -225,14 +219,13 @@ func (d *dataLoader[K, V]) scheduleBatch(ctx context.Context, stopSchedule <-cha
225
219
func (d * dataLoader [K , V ]) processBatch (ctx context.Context , keys []K , batchCtx []context.Context , chs map [K ][]chan Result [V ]) {
226
220
defer func () {
227
221
if r := recover (); r != nil {
228
- const size = 64 << 10
229
- buf := make ([]byte , size )
222
+ buf := make ([]byte , 64 << 10 )
230
223
buf = buf [:runtime .Stack (buf , false )]
231
224
err := fmt .Errorf ("dataloader: panic received in loader function: %v" , r )
232
225
fmt .Fprintf (os .Stderr , "%v\n %s" , err , buf )
233
226
234
227
for _ , chs := range chs {
235
- sendResult (chs , Result [V ]{err : err })
228
+ d . sendResult (chs , Result [V ]{err : err })
236
229
}
237
230
}
238
231
}()
@@ -259,32 +252,33 @@ func (d *dataLoader[K, V]) processBatch(ctx context.Context, keys []K, batchCtx
259
252
if results [i ].err == nil && d .cache != nil {
260
253
d .cache .Add (key , results [i ].data )
261
254
}
262
- sendResult (chs [key ], results [i ])
255
+ d . sendResult (chs [key ], results [i ])
263
256
}
264
257
}
265
258
266
- // reset resets the DataLoader
259
+ // reset resets the DataLoader state
267
260
func (d * dataLoader [K , V ]) reset () {
268
261
d .batch = make ([]K , 0 , d .config .BatchSize )
269
262
d .batchCtx = make ([]context.Context , 0 , d .config .BatchSize )
270
263
d .chs = make (map [K ][]chan Result [V ], d .config .BatchSize )
271
264
}
272
265
273
266
// sendResult sends a result to channels
274
- func sendResult [ V any ] (chs []chan Result [V ], result Result [V ]) {
267
+ func ( d * dataLoader [ K , V ]) sendResult (chs []chan Result [V ], result Result [V ]) {
275
268
for _ , ch := range chs {
276
269
ch <- result
277
270
close (ch )
278
271
}
279
272
}
280
273
274
+ var noopSpan = noop.Span {}
275
+
281
276
// startTrace starts a trace span
282
277
func (d * dataLoader [K , V ]) startTrace (ctx context.Context , name string , opts ... trace.SpanStartOption ) (context.Context , trace.Span ) {
283
278
if d .config .TracerProvider != nil {
284
- span := trace .SpanFromContext (ctx )
285
- if span .SpanContext ().IsValid () {
279
+ if span := trace .SpanFromContext (ctx ); span .SpanContext ().IsValid () {
286
280
return d .config .TracerProvider .Tracer ("dataLoader" ).Start (ctx , name , opts ... )
287
281
}
288
282
}
289
- return ctx , noop. Span {}
283
+ return ctx , noopSpan
290
284
}
0 commit comments