Skip to content

Commit d689016

Browse files
arvindh123dborovcanin
authored andcommitted
use NATS mqtt broker
Signed-off-by: Arvindh <arvindh91@gmail.com>
1 parent cac6c64 commit d689016

File tree

7 files changed

+110
-265
lines changed

7 files changed

+110
-265
lines changed

cmd/mqtt/main.go

Lines changed: 1 addition & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
smqlog "github.com/absmach/supermq/logger"
2727
"github.com/absmach/supermq/mqtt"
2828
"github.com/absmach/supermq/mqtt/events"
29-
mqtttracing "github.com/absmach/supermq/mqtt/tracing"
3029
domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient"
3130
"github.com/absmach/supermq/pkg/errors"
3231
"github.com/absmach/supermq/pkg/grpcclient"
@@ -36,7 +35,6 @@ import (
3635
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
3736
msgevents "github.com/absmach/supermq/pkg/messaging/events"
3837
"github.com/absmach/supermq/pkg/messaging/handler"
39-
mqttpub "github.com/absmach/supermq/pkg/messaging/mqtt"
4038
"github.com/absmach/supermq/pkg/server"
4139
"github.com/absmach/supermq/pkg/uuid"
4240
"github.com/caarlos0/env/v11"
@@ -135,31 +133,6 @@ func main() {
135133
}()
136134
tracer := tp.Tracer(svcName)
137135

138-
bsub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
139-
if err != nil {
140-
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
141-
exitCode = 1
142-
return
143-
}
144-
defer bsub.Close()
145-
bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub)
146-
147-
mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTTargetUsername, cfg.MQTTTargetPassword, cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
148-
if err != nil {
149-
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
150-
exitCode = 1
151-
return
152-
}
153-
defer mpub.Close()
154-
155-
fwd := mqtt.NewForwarder(brokers.SubjectAllMessages, logger)
156-
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllMessages)
157-
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
158-
logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err))
159-
exitCode = 1
160-
return
161-
}
162-
163136
np, err := brokers.NewPublisher(ctx, cfg.BrokerURL)
164137
if err != nil {
165138
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
@@ -251,9 +224,7 @@ func main() {
251224
go chc.CallHome(ctx)
252225
}
253226

254-
beforeHandler := beforeHandler{
255-
resolver: messaging.NewTopicResolver(channelsClient, domainsClient),
256-
}
227+
beforeHandler := mqtt.NewBeforeHandler(messaging.NewTopicResolver(channelsClient, domainsClient), parser, logger)
257228

258229
afterHandler := afterHandler{
259230
username: cfg.MQTTTargetUsername,
@@ -382,42 +353,3 @@ func (ah afterHandler) Intercept(ctx context.Context, pkt packets.ControlPacket,
382353

383354
return pkt, nil
384355
}
385-
386-
type beforeHandler struct {
387-
resolver messaging.TopicResolver
388-
}
389-
390-
// This interceptor is used to replace domain and channel routes with relevant domain and channel IDs in the message topic.
391-
func (bh beforeHandler) Intercept(ctx context.Context, pkt packets.ControlPacket, dir session.Direction) (packets.ControlPacket, error) {
392-
switch pt := pkt.(type) {
393-
case *packets.SubscribePacket:
394-
for i, topic := range pt.Topics {
395-
ft, err := bh.resolver.ResolveTopic(ctx, topic)
396-
if err != nil {
397-
return nil, err
398-
}
399-
pt.Topics[i] = ft
400-
}
401-
402-
return pt, nil
403-
case *packets.UnsubscribePacket:
404-
for i, topic := range pt.Topics {
405-
ft, err := bh.resolver.ResolveTopic(ctx, topic)
406-
if err != nil {
407-
return nil, err
408-
}
409-
pt.Topics[i] = ft
410-
}
411-
return pt, nil
412-
case *packets.PublishPacket:
413-
ft, err := bh.resolver.ResolveTopic(ctx, pt.TopicName)
414-
if err != nil {
415-
return nil, err
416-
}
417-
pt.TopicName = ft
418-
419-
return pt, nil
420-
}
421-
422-
return pkt, nil
423-
}

docker/.env

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,19 @@ SMQ_MESSAGE_BROKER_TYPE=msg_nats
4040
SMQ_MESSAGE_BROKER_URL=${SMQ_NATS_URL}
4141

4242
## MQTT Broker
43-
SMQ_MQTT_BROKER_TYPE=rabbitmq
44-
SMQ_MQTT_BROKER_HEALTH_CHECK=
45-
SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_RABBITMQ_MQTT_QOS}
43+
SMQ_MQTT_BROKER_TYPE=nats
44+
SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK}
45+
SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_NATS_MQTT_QOS}
4646
SMQ_MQTT_ADAPTER_MQTT_TARGET_PROTOCOL=mqtt
4747
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
4848
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883
49-
SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME=${SMQ_RABBITMQ_USER}
50-
SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD=${SMQ_RABBITMQ_PASS}
51-
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK}
49+
SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME=
50+
SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD=
51+
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK}
5252
SMQ_MQTT_ADAPTER_WS_TARGET_PROTOCOL=http
5353
SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
54-
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=${SMQ_RABBITMQ_WS_PORT}
55-
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_RABBITMQ_WS_TARGET_PATH}
54+
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080
55+
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_NATS_WS_TARGET_PATH}
5656

5757
## Redis
5858
SMQ_REDIS_TCP_PORT=6379

docker/docker-compose.yaml

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,7 +1129,6 @@ services:
11291129
container_name: supermq-mqtt
11301130
depends_on:
11311131
- clients
1132-
- rabbitmq
11331132
- nats
11341133
restart: on-failure
11351134
environment:
@@ -1565,27 +1564,6 @@ services:
15651564
bind:
15661565
create_host_path: true
15671566

1568-
rabbitmq:
1569-
image: rabbitmq:4.0.5-management-alpine
1570-
container_name: supermq-rabbitmq
1571-
restart: on-failure
1572-
environment:
1573-
RABBITMQ_ERLANG_COOKIE: ${SMQ_RABBITMQ_COOKIE}
1574-
RABBITMQ_DEFAULT_USER: ${SMQ_RABBITMQ_USER}
1575-
RABBITMQ_DEFAULT_PASS: ${SMQ_RABBITMQ_PASS}
1576-
RABBITMQ_DEFAULT_VHOST: ${SMQ_RABBITMQ_VHOST}
1577-
RABBITMQ_CONFIG_FILES: /etc/rabbitmq/conf.d/
1578-
ports:
1579-
- ${SMQ_RABBITMQ_PORT}:${SMQ_RABBITMQ_PORT}
1580-
- ${SMQ_RABBITMQ_HTTP_PORT}:${SMQ_RABBITMQ_HTTP_PORT}
1581-
- ${SMQ_RABBITMQ_WS_PORT}:${SMQ_RABBITMQ_WS_PORT}
1582-
volumes:
1583-
- ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
1584-
- ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/conf.d/10-defaults.conf
1585-
- supermq-mqtt-broker-volume:/var/lib/rabbitmq
1586-
networks:
1587-
- supermq-base-net
1588-
15891567
nats:
15901568
image: nats:2.10.25-alpine
15911569
container_name: supermq-nats

mqtt/forwarder.go

Lines changed: 0 additions & 70 deletions
This file was deleted.

mqtt/handler.go

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"fmt"
99
"log/slog"
1010
"strings"
11-
"time"
1211

1312
"github.com/absmach/mgate/pkg/session"
1413
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
@@ -158,44 +157,11 @@ func (h *handler) Connect(ctx context.Context) error {
158157

159158
// Publish - after client successfully published.
160159
func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) error {
161-
s, ok := session.FromContext(ctx)
162-
if !ok {
163-
return errors.Wrap(ErrFailedPublish, ErrClientNotInitialized)
164-
}
165-
h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic))
166-
167-
domainID, chanID, subTopic, topicType, err := h.parser.ParsePublishTopic(ctx, *topic, false)
168-
if err != nil {
169-
return errors.Wrap(ErrFailedPublish, err)
170-
}
171-
172-
msg := messaging.Message{
173-
Protocol: protocol,
174-
Domain: domainID,
175-
Channel: chanID,
176-
Subtopic: subTopic,
177-
Publisher: s.Username,
178-
Payload: *payload,
179-
Created: time.Now().UnixNano(),
180-
}
181-
182-
if topicType == messaging.MessageType {
183-
if err := h.publisher.Publish(ctx, messaging.EncodeMessageTopic(&msg), &msg); err != nil {
184-
return errors.Wrap(ErrFailedPublishToMsgBroker, err)
185-
}
186-
}
187-
188160
return nil
189161
}
190162

191163
// Subscribe - after client successfully subscribed.
192164
func (h *handler) Subscribe(ctx context.Context, topics *[]string) error {
193-
s, ok := session.FromContext(ctx)
194-
if !ok {
195-
return errors.Wrap(ErrFailedSubscribe, ErrClientNotInitialized)
196-
}
197-
h.logger.Info(fmt.Sprintf(LogInfoSubscribed, s.ID, strings.Join(*topics, ",")))
198-
199165
return nil
200166
}
201167

mqtt/intercepter.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package mqtt
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"time"
8+
9+
"github.com/absmach/mgate/pkg/session"
10+
"github.com/absmach/supermq/pkg/errors"
11+
"github.com/absmach/supermq/pkg/messaging"
12+
"github.com/eclipse/paho.mqtt.golang/packets"
13+
"google.golang.org/protobuf/proto"
14+
)
15+
16+
type beforeHandler struct {
17+
resolver messaging.TopicResolver
18+
parser messaging.TopicParser
19+
logger *slog.Logger
20+
}
21+
22+
func NewBeforeHandler(resolver messaging.TopicResolver, parser messaging.TopicParser, logger *slog.Logger) session.Interceptor {
23+
return &beforeHandler{
24+
resolver: resolver,
25+
parser: parser,
26+
logger: logger,
27+
}
28+
}
29+
30+
// This interceptor is used to replace domain and channel routes with relevant domain and channel IDs in the message topic.
31+
func (bh beforeHandler) Intercept(ctx context.Context, pkt packets.ControlPacket, dir session.Direction) (packets.ControlPacket, error) {
32+
switch pt := pkt.(type) {
33+
case *packets.SubscribePacket:
34+
for i, topic := range pt.Topics {
35+
ft, err := bh.resolver.ResolveTopic(ctx, topic)
36+
if err != nil {
37+
return nil, err
38+
}
39+
pt.Topics[i] = ft
40+
}
41+
42+
return pt, nil
43+
case *packets.UnsubscribePacket:
44+
for i, topic := range pt.Topics {
45+
ft, err := bh.resolver.ResolveTopic(ctx, topic)
46+
if err != nil {
47+
return nil, err
48+
}
49+
pt.Topics[i] = ft
50+
}
51+
return pt, nil
52+
case *packets.PublishPacket:
53+
ft, err := bh.resolver.ResolveTopic(ctx, pt.TopicName)
54+
if err != nil {
55+
return nil, err
56+
}
57+
pt.TopicName = ft
58+
59+
s, ok := session.FromContext(ctx)
60+
if !ok {
61+
return pt, errors.Wrap(ErrFailedPublish, ErrClientNotInitialized)
62+
}
63+
bh.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, ft))
64+
65+
switch dir {
66+
case session.Up:
67+
domainID, chanID, subTopic, _, err := bh.parser.ParsePublishTopic(ctx, ft, false)
68+
if err != nil {
69+
return pt, errors.Wrap(ErrFailedPublish, err)
70+
}
71+
72+
msg := &messaging.Message{
73+
Protocol: "mqtt",
74+
Domain: domainID,
75+
Channel: chanID,
76+
Subtopic: subTopic,
77+
Publisher: s.Username,
78+
Payload: pt.Payload,
79+
Created: time.Now().UnixNano(),
80+
}
81+
82+
data, err := proto.Marshal(msg)
83+
if err != nil {
84+
return pt, err
85+
}
86+
pt.Payload = data
87+
88+
case session.Down:
89+
var msg messaging.Message
90+
91+
if err := proto.Unmarshal(pt.Payload, &msg); err != nil {
92+
return pt, errors.Wrap(ErrFailedPublish, err)
93+
}
94+
pt.Payload = msg.GetPayload()
95+
}
96+
97+
return pt, nil
98+
}
99+
100+
return pkt, nil
101+
}

0 commit comments

Comments
 (0)