Skip to content

Commit f2af6a2

Browse files
committed
feat: add trace with links support
1 parent f9bdeb0 commit f2af6a2

File tree

6 files changed

+216
-13
lines changed

6 files changed

+216
-13
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Feature
1717
- Use hashicorp/golang-lru to cache the loaded values.
1818
- Can be used to batch and cache multiple requests.
1919
- Deduplicate identical requests, reducing the number of requests.
20+
- Support OpenTelemetry, trace batched requests with OpenTelemetry/Link.
2021

2122
Installation
2223
---

dataloader.go

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"time"
1010

1111
"github.com/hashicorp/golang-lru/v2/expirable"
12+
"go.opentelemetry.io/otel/trace"
13+
"go.opentelemetry.io/otel/trace/noop"
1214
)
1315

1416
// Loader is the function type for loading data
@@ -17,11 +19,6 @@ type Loader[K comparable, V any] func(context.Context, []K) []Result[V]
1719
// Interface is a `DataLoader` Interface which defines a public API for loading data from a particular
1820
// data back-end with unique keys such as the `id` column of a SQL table or
1921
// document name in a MongoDB database, given a batch loading function.
20-
//
21-
// Each `DataLoader` instance should contain a unique memoized cache. Use caution when
22-
// used in long-lived applications or those which serve many users with
23-
// different access permissions and consider creating a new instance per
24-
// web request.
2522
type Interface[K comparable, V any] interface {
2623
// Load loads a single key
2724
Load(context.Context, K) Result[V]
@@ -47,6 +44,8 @@ type config struct {
4744
CacheSize int
4845
// CacheExpire is the duration to expire cache items, Default is 1 minute
4946
CacheExpire time.Duration
47+
// TracerProvider is the tracer provider to use for tracing
48+
TracerProvider trace.TracerProvider
5049
}
5150

5251
// dataLoader is the main struct for the dataloader
@@ -56,6 +55,7 @@ type dataLoader[K comparable, V any] struct {
5655
config config
5756
mu sync.Mutex
5857
batch []K
58+
batchCtx []context.Context
5959
chs map[K][]chan Result[V]
6060
stopSchedule chan struct{}
6161
}
@@ -90,11 +90,17 @@ func New[K comparable, V any](loader Loader[K, V], options ...Option) Interface[
9090

9191
// Load loads a single key
9292
func (d *dataLoader[K, V]) Load(ctx context.Context, key K) Result[V] {
93+
ctx, span := d.startTrace(ctx, "dataLoader.Load")
94+
defer span.End()
95+
9396
return <-d.goLoad(ctx, key)
9497
}
9598

9699
// LoadMany loads multiple keys
97100
func (d *dataLoader[K, V]) LoadMany(ctx context.Context, keys []K) []Result[V] {
101+
ctx, span := d.startTrace(ctx, "dataLoader.LoadMany")
102+
defer span.End()
103+
98104
chs := make([]<-chan Result[V], len(keys))
99105
for i, key := range keys {
100106
chs[i] = d.goLoad(ctx, key)
@@ -104,12 +110,14 @@ func (d *dataLoader[K, V]) LoadMany(ctx context.Context, keys []K) []Result[V] {
104110
for i, ch := range chs {
105111
results[i] = <-ch
106112
}
107-
108113
return results
109114
}
110115

111116
// LoadMap loads multiple keys and returns a map of results
112117
func (d *dataLoader[K, V]) LoadMap(ctx context.Context, keys []K) map[K]Result[V] {
118+
ctx, span := d.startTrace(ctx, "dataLoader.LoadMap")
119+
defer span.End()
120+
113121
chs := make([]<-chan Result[V], len(keys))
114122
for i, key := range keys {
115123
chs[i] = d.goLoad(ctx, key)
@@ -167,6 +175,10 @@ func (d *dataLoader[K, V]) goLoad(ctx context.Context, key K) <-chan Result[V] {
167175

168176
// Lock the DataLoader
169177
d.mu.Lock()
178+
if d.config.TracerProvider != nil {
179+
d.batchCtx = append(d.batchCtx, ctx)
180+
}
181+
170182
if len(d.batch) == 0 {
171183
// If there are no keys in the current batch, schedule a new batch timer
172184
d.stopSchedule = make(chan struct{})
@@ -187,7 +199,7 @@ func (d *dataLoader[K, V]) goLoad(ctx context.Context, key K) <-chan Result[V] {
187199
// If the current batch is full, start processing it
188200
if len(d.batch) >= d.config.BatchSize {
189201
// spawn a new goroutine to process the batch
190-
go d.processBatch(ctx, d.batch, d.chs)
202+
go d.processBatch(ctx, d.batch, d.batchCtx, d.chs)
191203
close(d.stopSchedule)
192204
// Create a new batch, and a new set of channels
193205
d.reset()
@@ -205,7 +217,7 @@ func (d *dataLoader[K, V]) scheduleBatch(ctx context.Context, stopSchedule <-cha
205217
case <-time.After(d.config.Wait):
206218
d.mu.Lock()
207219
if len(d.batch) > 0 {
208-
go d.processBatch(ctx, d.batch, d.chs)
220+
go d.processBatch(ctx, d.batch, d.batchCtx, d.chs)
209221
d.reset()
210222
}
211223
d.mu.Unlock()
@@ -215,7 +227,7 @@ func (d *dataLoader[K, V]) scheduleBatch(ctx context.Context, stopSchedule <-cha
215227
}
216228

217229
// processBatch processes a batch of keys
218-
func (d *dataLoader[K, V]) processBatch(ctx context.Context, keys []K, chs map[K][]chan Result[V]) {
230+
func (d *dataLoader[K, V]) processBatch(ctx context.Context, keys []K, batchCtx []context.Context, chs map[K][]chan Result[V]) {
219231
defer func() {
220232
if r := recover(); r != nil {
221233
const size = 64 << 10
@@ -233,20 +245,37 @@ func (d *dataLoader[K, V]) processBatch(ctx context.Context, keys []K, chs map[K
233245
return
234246
}
235247
}()
236-
results := d.loader(ctx, keys)
237248

249+
if d.config.TracerProvider != nil {
250+
// Create a span with links to the batch contexts, which enables trace propagation
251+
// We should deduplicate identical batch contexts to avoid creating duplicate links.
252+
links := make([]trace.Link, 0, len(keys))
253+
seen := make(map[context.Context]struct{}, len(batchCtx))
254+
for _, bCtx := range batchCtx {
255+
if _, ok := seen[bCtx]; ok {
256+
continue
257+
}
258+
links = append(links, trace.Link{SpanContext: trace.SpanContextFromContext(bCtx)})
259+
seen[bCtx] = struct{}{}
260+
}
261+
var span trace.Span
262+
ctx, span = d.config.TracerProvider.Tracer("dataLoader").Start(ctx, "dataLoader.Batch", trace.WithLinks(links...))
263+
defer span.End()
264+
}
265+
266+
results := d.loader(ctx, keys)
238267
for i, key := range keys {
239268
if results[i].err == nil && d.cache != nil {
240269
d.cache.Add(key, results[i].data)
241270
}
242-
243271
sendResult(chs[key], results[i])
244272
}
245273
}
246274

247275
// reset resets the DataLoader
248276
func (d *dataLoader[K, V]) reset() {
249277
d.batch = make([]K, 0, d.config.BatchSize)
278+
d.batchCtx = make([]context.Context, 0, d.config.BatchSize)
250279
d.chs = make(map[K][]chan Result[V], d.config.BatchSize)
251280
}
252281

@@ -257,3 +286,11 @@ func sendResult[V any](chs []chan Result[V], result Result[V]) {
257286
close(ch)
258287
}
259288
}
289+
290+
// startTrace starts a trace span
291+
func (d *dataLoader[K, V]) startTrace(ctx context.Context, name string) (context.Context, trace.Span) {
292+
if d.config.TracerProvider != nil {
293+
return d.config.TracerProvider.Tracer("dataLoader").Start(ctx, name)
294+
}
295+
return ctx, noop.Span{}
296+
}

dataloader_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import (
66
"sync"
77
"testing"
88
"time"
9+
10+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
11+
"go.opentelemetry.io/otel/sdk/trace/tracetest"
912
)
1013

1114
func TestDataLoader(t *testing.T) {
@@ -23,6 +26,117 @@ func TestDataLoader(t *testing.T) {
2326
t.Run("Prime", testPrime)
2427
t.Run("Inflight", testInflight)
2528
t.Run("Schedule batch", testScheduleBatch)
29+
t.Run("Trace", testTrace)
30+
}
31+
32+
func testTrace(t *testing.T) {
33+
exporter := tracetest.NewInMemoryExporter()
34+
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(exporter))
35+
36+
loader := New(func(ctx context.Context, keys []int) []Result[string] {
37+
results := make([]Result[string], len(keys))
38+
for i, key := range keys {
39+
results[i] = Result[string]{data: fmt.Sprintf("Result for %d", key)}
40+
}
41+
return results
42+
},
43+
WithTracerProvider(tp),
44+
)
45+
46+
{
47+
ctx, span := tp.Tracer("dataLoader").Start(context.Background(), "test")
48+
defer span.End()
49+
50+
wg := sync.WaitGroup{}
51+
for i := 0; i < 10; i++ {
52+
wg.Add(1)
53+
go func(i int) {
54+
defer wg.Done()
55+
_ = loader.Load(ctx, i/2)
56+
}(i)
57+
}
58+
wg.Wait()
59+
60+
spans := exporter.GetSpans()
61+
if len(spans.Snapshots()) != 11 {
62+
t.Errorf("Expected 11 spans, got %d", len(spans.Snapshots()))
63+
}
64+
65+
if spans.Snapshots()[0].Name() != "dataLoader.Batch" {
66+
t.Errorf("Unexpected span name: %v", spans.Snapshots()[0].Name())
67+
}
68+
69+
if len(spans.Snapshots()[0].Links()) != 10 {
70+
t.Errorf("Expected 10 links, got %d", len(spans.Snapshots()[0].Links()))
71+
}
72+
73+
if spans.Snapshots()[1].Name() != "dataLoader.Load" {
74+
t.Errorf("Unexpected span name: %v", spans.Snapshots()[0].Name())
75+
}
76+
77+
if len(spans.Snapshots()[1].Links()) != 0 {
78+
t.Errorf("Expected 3 links, got %d", len(spans.Snapshots()[0].Links()))
79+
}
80+
81+
exporter.Reset()
82+
}
83+
{
84+
ctx, span := tp.Tracer("dataLoader").Start(context.Background(), "test")
85+
defer span.End()
86+
87+
_ = loader.LoadMany(ctx, []int{1, 2, 3, 9})
88+
spans := exporter.GetSpans()
89+
90+
if len(spans.Snapshots()) != 2 {
91+
t.Errorf("Expected 11 spans, got %d", len(spans.Snapshots()))
92+
}
93+
94+
if spans.Snapshots()[0].Name() != "dataLoader.Batch" {
95+
t.Errorf("Unexpected span name: %v", spans.Snapshots()[0].Name())
96+
}
97+
98+
if len(spans.Snapshots()[0].Links()) != 1 {
99+
t.Errorf("Expected 1 links, got %d", len(spans.Snapshots()[0].Links()))
100+
}
101+
102+
if spans.Snapshots()[1].Name() != "dataLoader.LoadMany" {
103+
t.Errorf("Unexpected span name: %v", spans.Snapshots()[0].Name())
104+
}
105+
106+
if len(spans.Snapshots()[1].Links()) != 0 {
107+
t.Errorf("Expected 0 links, got %d", len(spans.Snapshots()[0].Links()))
108+
}
109+
110+
exporter.Reset()
111+
}
112+
{
113+
ctx, span := tp.Tracer("dataLoader").Start(context.Background(), "test")
114+
defer span.End()
115+
116+
loader.LoadMap(ctx, []int{3, 14, 14, 15, 16})
117+
118+
spans := exporter.GetSpans()
119+
120+
if len(spans.Snapshots()) != 2 {
121+
t.Errorf("Expected 11 spans, got %d", len(spans.Snapshots()))
122+
}
123+
124+
if spans.Snapshots()[0].Name() != "dataLoader.Batch" {
125+
t.Errorf("Unexpected span name: %v", spans.Snapshots()[0].Name())
126+
}
127+
128+
if len(spans.Snapshots()[0].Links()) != 1 {
129+
t.Errorf("Expected 1 links, got %d", len(spans.Snapshots()[0].Links()))
130+
}
131+
132+
if spans.Snapshots()[1].Name() != "dataLoader.LoadMap" {
133+
t.Errorf("Unexpected span name: %v", spans.Snapshots()[0].Name())
134+
}
135+
136+
if len(spans.Snapshots()[1].Links()) != 0 {
137+
t.Errorf("Expected 0 links, got %d", len(spans.Snapshots()[0].Links()))
138+
}
139+
}
26140
}
27141

28142
func testScheduleBatch(t *testing.T) {

go.mod

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,17 @@ module github.com/sysulq/dataloader-go
22

33
go 1.22.4
44

5-
require github.com/hashicorp/golang-lru/v2 v2.0.7
5+
require (
6+
github.com/hashicorp/golang-lru/v2 v2.0.7
7+
go.opentelemetry.io/otel/sdk v1.28.0
8+
go.opentelemetry.io/otel/trace v1.28.0
9+
)
10+
11+
require (
12+
github.com/go-logr/logr v1.4.2 // indirect
13+
github.com/go-logr/stdr v1.2.2 // indirect
14+
github.com/google/uuid v1.6.0 // indirect
15+
go.opentelemetry.io/otel v1.28.0 // indirect
16+
go.opentelemetry.io/otel/metric v1.28.0 // indirect
17+
golang.org/x/sys v0.21.0 // indirect
18+
)

go.sum

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,29 @@
1+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
4+
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
5+
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
6+
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
7+
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
8+
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
9+
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
10+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
11+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
112
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
213
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
14+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
15+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
16+
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
17+
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
18+
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
19+
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
20+
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
21+
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
22+
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
23+
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
24+
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
25+
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
26+
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
27+
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
28+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
29+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

option.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package dataloader
22

3-
import "time"
3+
import (
4+
"time"
5+
6+
"go.opentelemetry.io/otel/trace"
7+
)
48

59
// Option is a function type for configuring DataLoader
610
type Option func(*config)
@@ -26,3 +30,10 @@ func WithWait(wait time.Duration) Option {
2630
c.Wait = wait
2731
}
2832
}
33+
34+
// WithTracerProvider sets the tracer for the DataLoader
35+
func WithTracerProvider(tp trace.TracerProvider) Option {
36+
return func(c *config) {
37+
c.TracerProvider = tp
38+
}
39+
}

0 commit comments

Comments
 (0)