Skip to content

Commit f29493f

Browse files
mqtt subscriber: add log for message properties
1 parent bca5dba commit f29493f

File tree

2 files changed

+23
-1
lines changed

2 files changed

+23
-1
lines changed

mqtt-broker/publisher.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package mqttbroker
22

33
import (
44
"context"
5+
"strings"
56

67
mqtt "github.com/eclipse/paho.mqtt.golang"
78
"github.com/golangid/candi/candihelper"
@@ -27,6 +28,17 @@ func (p *publisher) PublishMessage(ctx context.Context, args *candishared.Publis
2728
trace, ctx := tracer.StartTraceWithContext(ctx, "MQTTBroker:PublishMessage")
2829
defer trace.Finish()
2930

31+
opReader := p.client.OptionsReader()
32+
var brokers []string
33+
for _, u := range opReader.Servers() {
34+
brokers = append(brokers, u.String())
35+
}
36+
trace.SetTag("brokers", strings.Join(brokers, ","))
37+
trace.SetTag("topic", args.Topic)
38+
trace.SetTag("key", args.Key)
39+
trace.Log("header", args.Header)
40+
trace.Log("message", args.Message)
41+
3042
var msg []byte
3143
if len(args.Message) > 0 {
3244
msg = args.Message

mqtt-broker/subscriber.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,22 @@ func (w *workerEngine) processMessage(_ mqtt.Client, m mqtt.Message) {
136136
w.wg.Done()
137137
}()
138138

139+
opReader := w.broker.client.OptionsReader()
140+
var brokers []string
141+
for _, u := range opReader.Servers() {
142+
brokers = append(brokers, u.String())
143+
}
144+
trace.SetTag("brokers", strings.Join(brokers, ","))
145+
trace.SetTag("qos", m.Qos())
146+
trace.SetTag("retained", m.Retained())
147+
trace.SetTag("client_id", opReader.ClientID())
148+
trace.Log("message_id", m.MessageID())
139149
if w.broker.WorkerType != MQTTBroker {
140150
trace.SetTag("worker_type", string(w.broker.WorkerType))
141151
}
142152
trace.SetTag("topic", m.Topic())
143153
trace.Log("params", params)
144-
trace.Log("body", m.Payload())
154+
trace.Log("message", m.Payload())
145155

146156
if w.opt.debugMode {
147157
log.Printf("\x1b[35;3mMQTT Subscriber%s: consuming message from topic '%s'\x1b[0m", getWorkerTypeLog(w.broker.WorkerType), m.Topic())

0 commit comments

Comments
 (0)