Skip to content

Commit cbe6ecb

Browse files
authored
Merge pull request #3 from Vinelab/pubsub
Add support for Google Cloud PubSub
2 parents 7530185 + 3b8c105 commit cbe6ecb

File tree

6 files changed

+133
-0
lines changed

6 files changed

+133
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ import "github.com/Vinelab/tracing-go/formats"
296296
spanCtx, err := Trace.Extract(&carrier, formats.TextMap)
297297
spanCtx, err := Trace.Extract(&carrier, formats.HTTP)
298298
spanCtx, err := Trace.Extract(&carrier, formats.AMQP)
299+
spanCtx, err := Trace.Extract(&carrier, formats.GooglePubSub)
299300
```
300301

301302
You may also add your own format using `RegisterExtractionFormat` method:
@@ -328,6 +329,7 @@ import "github.com/Vinelab/tracing-go/formats"
328329
err := Trace.Inject(&carrier, formats.TextMap)
329330
err := Trace.Inject(&carrier, formats.HTTP)
330331
err := Trace.Inject(&carrier, formats.AMQP)
332+
err := Trace.Inject(&carrier, formats.GooglePubSub)
331333
```
332334

333335
You may also add your own format using `RegisterInjectionFormat` method.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package zipkin
2+
3+
import (
4+
"log"
5+
6+
"cloud.google.com/go/pubsub"
7+
"github.com/Vinelab/tracing-go"
8+
"github.com/Vinelab/tracing-go/drivers/zipkin/propagation"
9+
)
10+
11+
// GooglePubSubExtractor manages trace extraction from Google PubSub carrier
12+
type GooglePubSubExtractor struct {
13+
TracerSetter
14+
}
15+
16+
// NewGooglePubSubExtractor returns the instance of AMQPExtractor
17+
func NewGooglePubSubExtractor() *GooglePubSubExtractor {
18+
return &GooglePubSubExtractor{}
19+
}
20+
21+
// Extract deserializes SpanContext from amqp.Delivery object
22+
func (extractor *GooglePubSubExtractor) Extract(carrier interface{}) (tracing.SpanContext, error) {
23+
msg, ok := carrier.(*pubsub.Message)
24+
25+
if !ok {
26+
log.Fatalf("Expected *pubsub.Message, got %T", carrier)
27+
}
28+
29+
rawCtx := extractor.Tracing.Extract(propagation.ExtractGooglePubSub(msg))
30+
return NewSpanContext(rawCtx), nil
31+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package zipkin
2+
3+
import (
4+
"log"
5+
6+
"cloud.google.com/go/pubsub"
7+
"github.com/Vinelab/tracing-go"
8+
"github.com/Vinelab/tracing-go/drivers/zipkin/propagation"
9+
"github.com/openzipkin/zipkin-go/model"
10+
)
11+
12+
// GooglePubSubInjector manages trace injection into Google Cloud PubSub carrier
13+
type GooglePubSubInjector struct {
14+
//
15+
}
16+
17+
// NewGooglePubSubInjector returns the instance of GooglePubSubInjector
18+
func NewGooglePubSubInjector() *GooglePubSubInjector {
19+
return &GooglePubSubInjector{}
20+
}
21+
22+
// Inject serialises given SpanContext into given amqp.Publishing object
23+
func (extractor *GooglePubSubInjector) Inject(spanCtx tracing.SpanContext, carrier interface{}) error {
24+
msg, ok := carrier.(*pubsub.Message)
25+
if !ok {
26+
log.Fatalf("Expected *pubsub.Message, got %T", carrier)
27+
}
28+
29+
rawCtx := spanCtx.RawContext()
30+
31+
zipkinCtx, ok := rawCtx.(model.SpanContext)
32+
if !ok {
33+
log.Fatalf("Expected %T, got %T", model.SpanContext{}, rawCtx)
34+
}
35+
36+
inject := propagation.InjectGooglePubSub(msg)
37+
return inject(zipkinCtx)
38+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package propagation
2+
3+
import (
4+
"cloud.google.com/go/pubsub"
5+
"github.com/openzipkin/zipkin-go/model"
6+
"github.com/openzipkin/zipkin-go/propagation"
7+
"github.com/openzipkin/zipkin-go/propagation/b3"
8+
)
9+
10+
// ExtractGooglePubSub will extract a span.Context from the Google Cloud PubSub message if found in B3 header format.
11+
func ExtractGooglePubSub(msg *pubsub.Message) propagation.Extractor {
12+
return func() (*model.SpanContext, error) {
13+
var (
14+
traceIDHeader = msg.Attributes[b3.TraceID]
15+
spanIDHeader = msg.Attributes[b3.SpanID]
16+
parentSpanIDHeader = msg.Attributes[b3.ParentSpanID]
17+
sampledHeader = msg.Attributes[b3.Sampled]
18+
flagsHeader = msg.Attributes[b3.Flags]
19+
)
20+
21+
return b3.ParseHeaders(
22+
traceIDHeader, spanIDHeader, parentSpanIDHeader, sampledHeader,
23+
flagsHeader,
24+
)
25+
}
26+
}
27+
28+
// InjectGooglePubSub will inject a span.Context into a Google Cloud PubSub message
29+
func InjectGooglePubSub(msg *pubsub.Message) propagation.Injector {
30+
return func(sc model.SpanContext) error {
31+
if (model.SpanContext{}) == sc {
32+
return b3.ErrEmptyContext
33+
}
34+
35+
if sc.Debug {
36+
msg.Attributes[b3.Flags] = "1"
37+
} else if sc.Sampled != nil {
38+
// Debug is encoded as X-B3-Flags: 1. Since Debug implies Sampled,
39+
// so don't also send "X-B3-Sampled: 1".
40+
if *sc.Sampled {
41+
msg.Attributes[b3.Sampled] = "1"
42+
} else {
43+
msg.Attributes[b3.Sampled] = "0"
44+
}
45+
}
46+
47+
if !sc.TraceID.Empty() && sc.ID > 0 {
48+
msg.Attributes[b3.TraceID] = sc.TraceID.String()
49+
msg.Attributes[b3.SpanID] = sc.ID.String()
50+
if sc.ParentID != nil {
51+
msg.Attributes[b3.ParentSpanID] = sc.ParentID.String()
52+
}
53+
}
54+
55+
return nil
56+
}
57+
}

drivers/zipkin/tracer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ func registerDefaultExtractionFormats() map[string]tracing.Extractor {
253253
extractionFormats[formats.TextMap] = NewTextMapExtractor()
254254
extractionFormats[formats.HTTP] = NewHTTPExtractor()
255255
extractionFormats[formats.AMQP] = NewAMQPExtractor()
256+
extractionFormats[formats.GooglePubSub] = NewGooglePubSubExtractor()
256257

257258
return extractionFormats
258259
}
@@ -263,6 +264,7 @@ func registerDefaultInjectionFormats() map[string]tracing.Injector {
263264
injectionFormats[formats.TextMap] = NewTextMapInjector()
264265
injectionFormats[formats.HTTP] = NewHTTPInjector()
265266
injectionFormats[formats.AMQP] = NewAMQPInjector()
267+
injectionFormats[formats.GooglePubSub] = NewGooglePubSubInjector()
266268

267269
return injectionFormats
268270
}

formats/formats.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,7 @@ var (
99

1010
// AMQP is a format descriptor for propagating trace context via AMQP message
1111
AMQP = "amqp"
12+
13+
// GooglePubSub is a format descriptor for propagating trace context via Google Cloud PubSub message
14+
GooglePubSub = "google_pubsub"
1215
)

0 commit comments

Comments
 (0)