Skip to content

Commit 8a4ca5b

Browse files
committed
chore: improve batchCtx
1 parent 4cb5583 commit 8a4ca5b

File tree

1 file changed

+7
-10
lines changed

1 file changed

+7
-10
lines changed

dataloader.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type dataLoader[K comparable, V any] struct {
5353
config config
5454
mu sync.Mutex
5555
batch []K
56-
batchCtx []context.Context
56+
batchCtx map[context.Context]struct{}
5757
chs map[K][]chan Result[V]
5858
stopSchedule chan struct{}
5959
}
@@ -167,7 +167,9 @@ func (d *dataLoader[K, V]) goLoad(ctx context.Context, key K) <-chan Result[V] {
167167
// Lock the DataLoader
168168
d.mu.Lock()
169169
if d.config.TracerProvider != nil {
170-
d.batchCtx = append(d.batchCtx, ctx)
170+
if _, ok := d.batchCtx[ctx]; !ok {
171+
d.batchCtx[ctx] = struct{}{}
172+
}
171173
}
172174

173175
if len(d.batch) == 0 {
@@ -216,7 +218,7 @@ func (d *dataLoader[K, V]) scheduleBatch(ctx context.Context, stopSchedule <-cha
216218
}
217219

218220
// processBatch processes a batch of keys
219-
func (d *dataLoader[K, V]) processBatch(ctx context.Context, keys []K, batchCtx []context.Context, chs map[K][]chan Result[V]) {
221+
func (d *dataLoader[K, V]) processBatch(ctx context.Context, keys []K, batchCtx map[context.Context]struct{}, chs map[K][]chan Result[V]) {
220222
defer func() {
221223
if r := recover(); r != nil {
222224
buf := make([]byte, 64<<10)
@@ -234,13 +236,8 @@ func (d *dataLoader[K, V]) processBatch(ctx context.Context, keys []K, batchCtx
234236
// Create a span with links to the batch contexts, which enables trace propagation
235237
// We should deduplicate identical batch contexts to avoid creating duplicate links.
236238
links := make([]trace.Link, 0, len(keys))
237-
seen := make(map[context.Context]struct{}, len(batchCtx))
238-
for _, bCtx := range batchCtx {
239-
if _, ok := seen[bCtx]; ok {
240-
continue
241-
}
239+
for bCtx := range batchCtx {
242240
links = append(links, trace.Link{SpanContext: trace.SpanContextFromContext(bCtx)})
243-
seen[bCtx] = struct{}{}
244241
}
245242
var span trace.Span
246243
ctx, span = d.startTrace(ctx, "dataLoader.Batch", trace.WithLinks(links...))
@@ -259,7 +256,7 @@ func (d *dataLoader[K, V]) processBatch(ctx context.Context, keys []K, batchCtx
259256
// reset resets the DataLoader state
260257
func (d *dataLoader[K, V]) reset() {
261258
d.batch = make([]K, 0, d.config.BatchSize)
262-
d.batchCtx = make([]context.Context, 0, d.config.BatchSize)
259+
d.batchCtx = make(map[context.Context]struct{}, d.config.BatchSize)
263260
d.chs = make(map[K][]chan Result[V], d.config.BatchSize)
264261
}
265262

0 commit comments

Comments
 (0)