Skip to content

Commit 9558537

Browse files
committed
mqtt
1 parent 22a7aa4 commit 9558537

File tree

7 files changed

+513
-26
lines changed

7 files changed

+513
-26
lines changed

device/device.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ func init() {
1313
type Device struct {
1414
Id string `json:"id,omitempty" xorm:"pk"`
1515
ProductId string `json:"product_id,omitempty" xorm:"index"`
16-
LinkerId string `json:"linker_id,omitempty" xorm:"index"`
17-
IncomingId string `json:"incoming_id,omitempty" xorm:"index"`
16+
LinkId string `json:"link_id,omitempty" xorm:"index"`
1817
Name string `json:"name,omitempty"`
1918
Description string `json:"description,omitempty"`
2019
Station map[string]any `json:"station,omitempty" xorm:"json"` //从站信息(协议定义表单)
@@ -27,3 +26,8 @@ type DeviceModel struct {
2726
Validators []*product.Validator `json:"validators,omitempty" xorm:"json"`
2827
Created time.Time `json:"created,omitempty" xorm:"created"`
2928
}
29+
30+
type Status struct {
31+
Online bool `json:"online,omitempty"`
32+
Error string `json:"error,omitempty"`
33+
}

internal/device.go

Lines changed: 288 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
package internal
22

33
import (
4+
"errors"
45
"github.com/busy-cloud/boat/db"
56
"github.com/busy-cloud/boat/lib"
67
"github.com/busy-cloud/boat/log"
78
"github.com/busy-cloud/boat/mqtt"
89
"github.com/god-jason/iot-master/device"
10+
"github.com/god-jason/iot-master/link"
911
"github.com/god-jason/iot-master/product"
1012
"github.com/god-jason/iot-master/project"
1113
"github.com/god-jason/iot-master/space"
14+
"math/rand"
15+
"strconv"
1216
"sync"
1317
"time"
1418
)
@@ -20,20 +24,41 @@ func GetDevice(id string) *Device {
2024
}
2125

2226
type Device struct {
23-
device.Device `xorm:"extends"`
27+
device.Device
28+
device.Status
2429

25-
valuesLock sync.RWMutex
26-
Values map[string]any `json:"values"`
27-
Updated time.Time `json:"updated"`
30+
values Values
31+
32+
linker string
33+
protocol string
2834

2935
projects []string
3036
spaces []string
3137

3238
validators []*Validator
39+
40+
waitingResponse map[string]chan any
41+
waitingLock sync.RWMutex
3342
}
3443

3544
func (d *Device) Open() error {
36-
d.Values = make(map[string]any)
45+
d.waitingResponse = make(map[string]chan any)
46+
47+
//加载连接(主要是协议)
48+
if d.LinkId != "" {
49+
var lnk link.Link
50+
has, err := db.Engine().ID(d.LinkId).Get(&lnk)
51+
if err != nil {
52+
d.Error = err.Error()
53+
return err
54+
}
55+
if !has {
56+
d.Error = "没有指定链接"
57+
return errors.New(d.Error)
58+
}
59+
d.protocol = lnk.Protocol
60+
d.linker = lnk.Linker
61+
}
3762

3863
//查询绑定的项目
3964
var ps []*project.ProjectDevice
@@ -68,8 +93,9 @@ func (d *Device) Open() error {
6893
}
6994
vv := &Validator{Validator: v}
7095
d.validators = append(d.validators, vv)
71-
err = vv.Build() //重复编译了
96+
err = vv.Build() //TODO 重复编译了
7297
if err != nil {
98+
d.Error = err.Error()
7399
log.Error(err)
74100
}
75101
}
@@ -89,6 +115,7 @@ func (d *Device) Open() error {
89115
d.validators = append(d.validators, vv)
90116
err = vv.Build() //重复编译了
91117
if err != nil {
118+
d.Error = err.Error()
92119
log.Error(err)
93120
}
94121
}
@@ -103,29 +130,21 @@ func (d *Device) PutValues(values map[string]any) {
103130

104131
//广播消息
105132
var topics []string
106-
topics = append(topics, "device/"+d.Id+"/values")
107133
for _, p := range d.projects {
108-
topics = append(topics, "project/"+p+"/device/"+d.Id+"/property")
134+
topics = append(topics, "project/"+p+"/device/"+d.Id+"/values")
109135
}
110136
for _, s := range d.spaces {
111-
topics = append(topics, "space/"+s+"/device/"+d.Id+"/property")
137+
topics = append(topics, "space/"+s+"/device/"+d.Id+"/values")
112138
}
113139
if len(topics) > 0 {
114140
mqtt.PublishEx(topics, values)
115141
}
116142

117-
d.valuesLock.Lock()
118-
defer d.valuesLock.Unlock() //TODO 后续发消息和入库,锁的时间比较长
119-
120-
//更新数据
121-
for k, v := range values {
122-
d.Values[k] = v
123-
}
124-
d.Values["__update"] = time.Now()
143+
d.values.Put(values)
125144

126145
//检查属性
127146
for _, v := range d.validators {
128-
alarm, err := v.Evaluate(d.Values)
147+
alarm, err := v.Evaluate(d.values.Get())
129148
if err != nil {
130149
log.Error(err)
131150
}
@@ -150,3 +169,254 @@ func (d *Device) PutValues(values map[string]any) {
150169
}
151170
}
152171
}
172+
173+
func (d *Device) GetValues() map[string]any {
174+
return d.values.Get()
175+
}
176+
177+
type SyncRequest struct {
178+
MsgId string `json:"msg_id"`
179+
DeviceId string `json:"device_id"`
180+
}
181+
182+
type SyncResponse struct {
183+
MsgId string `json:"msg_id"`
184+
DeviceId string `json:"device_id"`
185+
Values map[string]any `json:"values"`
186+
}
187+
188+
func (d *Device) Sync(timeout int) (map[string]any, error) {
189+
req := SyncRequest{
190+
MsgId: strconv.FormatInt(rand.Int63(), 10),
191+
DeviceId: d.Id,
192+
}
193+
token := mqtt.Publish("protocol/"+d.protocol+"/"+d.linker+"/"+d.LinkId+"/sync", &req)
194+
token.Wait()
195+
err := token.Error()
196+
if err != nil {
197+
return nil, err
198+
}
199+
200+
//等待消息
201+
ch := make(chan any)
202+
203+
d.waitingLock.Lock()
204+
d.waitingResponse[req.MsgId] = ch
205+
d.waitingLock.Unlock()
206+
207+
if timeout < 1 {
208+
timeout = 30
209+
}
210+
211+
select {
212+
case resp := <-ch:
213+
if res, ok := resp.(*SyncResponse); ok {
214+
return res.Values, nil
215+
} else {
216+
return nil, errors.New("want type SyncResponse")
217+
}
218+
case <-time.After(time.Duration(timeout) * time.Second):
219+
220+
d.waitingLock.Lock()
221+
delete(d.waitingResponse, req.MsgId)
222+
d.waitingLock.Unlock()
223+
224+
return nil, errors.New("请求超时")
225+
}
226+
}
227+
228+
func (d *Device) onSyncResponse(resp *SyncResponse) {
229+
d.waitingLock.RLock()
230+
defer d.waitingLock.RUnlock()
231+
232+
if ch, ok := d.waitingResponse[resp.MsgId]; ok {
233+
ch <- ch
234+
}
235+
}
236+
237+
type ReadRequest struct {
238+
MsgId string `json:"msg_id"`
239+
DeviceId string `json:"device_id"`
240+
Points []string `json:"points"`
241+
}
242+
243+
type ReadResponse struct {
244+
MsgId string `json:"msg_id"`
245+
DeviceId string `json:"device_id"`
246+
Values map[string]any `json:"values"`
247+
}
248+
249+
func (d *Device) Read(points []string, timeout int) (map[string]any, error) {
250+
req := ReadRequest{
251+
MsgId: strconv.FormatInt(rand.Int63(), 10),
252+
DeviceId: d.Id,
253+
}
254+
token := mqtt.Publish("protocol/"+d.protocol+"/"+d.linker+"/"+d.LinkId+"/read", &req)
255+
token.Wait()
256+
err := token.Error()
257+
if err != nil {
258+
return nil, err
259+
}
260+
261+
//等待消息
262+
ch := make(chan any)
263+
264+
d.waitingLock.Lock()
265+
d.waitingResponse[req.MsgId] = ch
266+
d.waitingLock.Unlock()
267+
268+
if timeout < 1 {
269+
timeout = 30
270+
}
271+
272+
select {
273+
case resp := <-ch:
274+
if res, ok := resp.(*ReadResponse); ok {
275+
return res.Values, nil
276+
} else {
277+
return nil, errors.New("want type ReadResponse")
278+
}
279+
case <-time.After(time.Duration(timeout) * time.Second):
280+
281+
d.waitingLock.Lock()
282+
delete(d.waitingResponse, req.MsgId)
283+
d.waitingLock.Unlock()
284+
285+
return nil, errors.New("请求超时")
286+
}
287+
}
288+
289+
func (d *Device) onReadResponse(resp *ReadResponse) {
290+
d.waitingLock.RLock()
291+
defer d.waitingLock.RUnlock()
292+
293+
if ch, ok := d.waitingResponse[resp.MsgId]; ok {
294+
ch <- ch
295+
}
296+
}
297+
298+
type WriteRequest struct {
299+
MsgId string `json:"msg_id"`
300+
DeviceId string `json:"device_id"`
301+
Values map[string]any `json:"values"`
302+
}
303+
304+
type WriteResponse struct {
305+
MsgId string `json:"msg_id"`
306+
DeviceId string `json:"device_id"`
307+
Result map[string]bool `json:"result"`
308+
}
309+
310+
func (d *Device) Write(values map[string]any, timeout int) (map[string]bool, error) {
311+
req := WriteRequest{
312+
MsgId: strconv.FormatInt(rand.Int63(), 10),
313+
DeviceId: d.Id,
314+
Values: values,
315+
}
316+
token := mqtt.Publish("protocol/"+d.protocol+"/"+d.linker+"/"+d.LinkId+"/write", &req)
317+
token.Wait()
318+
err := token.Error()
319+
if err != nil {
320+
return nil, err
321+
}
322+
323+
//等待消息
324+
ch := make(chan any)
325+
326+
d.waitingLock.Lock()
327+
d.waitingResponse[req.MsgId] = ch
328+
d.waitingLock.Unlock()
329+
330+
if timeout < 1 {
331+
timeout = 30
332+
}
333+
334+
select {
335+
case resp := <-ch:
336+
if res, ok := resp.(*WriteResponse); ok {
337+
return res.Result, nil
338+
} else {
339+
return nil, errors.New("want type WriteResponse")
340+
}
341+
case <-time.After(time.Duration(timeout) * time.Second):
342+
343+
d.waitingLock.Lock()
344+
delete(d.waitingResponse, req.MsgId)
345+
d.waitingLock.Unlock()
346+
347+
return nil, errors.New("请求超时")
348+
}
349+
}
350+
351+
func (d *Device) onWriteResponse(resp *WriteResponse) {
352+
d.waitingLock.RLock()
353+
defer d.waitingLock.RUnlock()
354+
355+
if ch, ok := d.waitingResponse[resp.MsgId]; ok {
356+
ch <- ch
357+
}
358+
}
359+
360+
type ActionRequest struct {
361+
MsgId string `json:"msg_id"`
362+
DeviceId string `json:"device_id"`
363+
Action string `json:"action"`
364+
Parameters map[string]any `json:"parameters"`
365+
}
366+
367+
type ActionResponse struct {
368+
MsgId string `json:"msg_id"`
369+
DeviceId string `json:"device_id"`
370+
Result map[string]any `json:"result"`
371+
}
372+
373+
func (d *Device) Action(action string, parameters map[string]any, timeout int) (map[string]any, error) {
374+
req := ActionRequest{
375+
MsgId: strconv.FormatInt(rand.Int63(), 10),
376+
DeviceId: d.Id,
377+
Action: action,
378+
Parameters: parameters,
379+
}
380+
token := mqtt.Publish("protocol/"+d.protocol+"/"+d.linker+"/"+d.LinkId+"/action", &req)
381+
token.Wait()
382+
err := token.Error()
383+
if err != nil {
384+
return nil, err
385+
}
386+
387+
//等待消息
388+
ch := make(chan any)
389+
390+
d.waitingLock.Lock()
391+
d.waitingResponse[req.MsgId] = ch
392+
d.waitingLock.Unlock()
393+
394+
if timeout < 1 {
395+
timeout = 30
396+
}
397+
398+
select {
399+
case resp := <-ch:
400+
if res, ok := resp.(*ActionResponse); ok {
401+
return res.Result, nil
402+
} else {
403+
return nil, errors.New("want type ActionResponse")
404+
}
405+
case <-time.After(time.Duration(timeout) * time.Second):
406+
407+
d.waitingLock.Lock()
408+
delete(d.waitingResponse, req.MsgId)
409+
d.waitingLock.Unlock()
410+
411+
return nil, errors.New("请求超时")
412+
}
413+
}
414+
415+
func (d *Device) onActionResponse(resp *ActionResponse) {
416+
d.waitingLock.RLock()
417+
defer d.waitingLock.RUnlock()
418+
419+
if ch, ok := d.waitingResponse[resp.MsgId]; ok {
420+
ch <- ch
421+
}
422+
}

0 commit comments

Comments
 (0)