@@ -22,6 +22,7 @@ import (
22
22
"context"
23
23
"crypto/tls"
24
24
"crypto/x509"
25
+ "errors"
25
26
"fmt"
26
27
"reflect"
27
28
"sync"
@@ -41,6 +42,7 @@ import (
41
42
"github.com/plgd-dev/device/v2/schema/cloud"
42
43
"github.com/plgd-dev/device/v2/schema/device"
43
44
plgdResources "github.com/plgd-dev/device/v2/schema/resources"
45
+ "github.com/plgd-dev/go-coap/v3/message"
44
46
"github.com/plgd-dev/go-coap/v3/message/codes"
45
47
"github.com/plgd-dev/go-coap/v3/message/pool"
46
48
"github.com/plgd-dev/go-coap/v3/mux"
@@ -50,8 +52,6 @@ import (
50
52
"github.com/plgd-dev/go-coap/v3/tcp/client"
51
53
)
52
54
53
- const tickInterval = time .Second * 10
54
-
55
55
type (
56
56
GetLinksFilteredBy func (endpoints schema.Endpoints , deviceIDfilter uuid.UUID , resourceTypesFitler []string , policyBitMaskFitler schema.BitMask ) (links schema.ResourceLinks )
57
57
GetCertificates func (deviceID string ) []tls.Certificate
@@ -82,22 +82,25 @@ type Manager struct {
82
82
caPool CAPoolGetter
83
83
getCertificates GetCertificates
84
84
removeCloudCAs RemoveCloudCAs
85
+ tickInterval time.Duration
85
86
86
87
private struct {
87
88
mutex sync.Mutex
88
89
cfg Configuration
89
90
previousCloudIDs []string
90
91
readyToPublishResources map [string ]struct {}
91
92
readyToUnpublishResources map [string ]struct {}
93
+ creds ocfCloud.CoapSignUpResponse
92
94
}
93
95
94
96
logger log.Logger
95
- creds ocfCloud.CoapSignUpResponse
96
97
client * client.Conn
97
98
signedIn bool
98
99
resourcesPublished bool
100
+ forceRefreshToken bool
99
101
done chan struct {}
100
102
stopped atomic.Bool
103
+ reconnect atomic.Bool
101
104
trigger chan bool
102
105
loop * eventloop.Loop
103
106
}
@@ -114,7 +117,8 @@ func New(cfg Config, deviceID uuid.UUID, save func(), handler net.RequestHandler
114
117
removeCloudCAs : func (... string ) {
115
118
// do nothing
116
119
},
117
- logger : log .NewNilLogger (),
120
+ logger : log .NewNilLogger (),
121
+ tickInterval : time .Second * 10 ,
118
122
}
119
123
for _ , opt := range opts {
120
124
opt (& o )
@@ -133,6 +137,7 @@ func New(cfg Config, deviceID uuid.UUID, save func(), handler net.RequestHandler
133
137
removeCloudCAs : o .removeCloudCAs ,
134
138
logger : o .logger ,
135
139
loop : loop ,
140
+ tickInterval : o .tickInterval ,
136
141
}
137
142
c .private .cfg .ProvisioningStatus = cloud .ProvisioningStatus_UNINITIALIZED
138
143
c .importConfig (cfg )
@@ -186,6 +191,13 @@ func (c *Manager) handleTrigger(value reflect.Value, closed bool) {
186
191
if wantToReset {
187
192
c .resetCredentials (ctx , true )
188
193
}
194
+ if c .reconnect .CompareAndSwap (true , false ) {
195
+ err := c .close ()
196
+ if err != nil && ! errors .Is (err , context .Canceled ) {
197
+ c .logger .Errorf ("cannot close connection for reconnect: %w" , err )
198
+ }
199
+ return
200
+ }
189
201
if ! c .isInitialized () {
190
202
// resources will be published after sign in
191
203
c .resetPublishing ()
@@ -220,7 +232,7 @@ func (c *Manager) Init() {
220
232
if c .private .cfg .URL != "" {
221
233
c .triggerRunner (false )
222
234
}
223
- t := time .NewTicker (tickInterval )
235
+ t := time .NewTicker (c . tickInterval )
224
236
handlers := []eventloop.Handler {
225
237
eventloop .NewReadHandler (reflect .ValueOf (c .trigger ), c .handleTrigger ),
226
238
eventloop .NewReadHandler (reflect .ValueOf (t .C ), c .handleTimer ),
@@ -242,14 +254,16 @@ func (c *Manager) resetCredentials(ctx context.Context, signOff bool) {
242
254
c .logger .Debugf ("%w" , err )
243
255
}
244
256
}
245
- c .creds = ocfCloud.CoapSignUpResponse {}
246
- c .signedIn = false
257
+ c .setCreds (ocfCloud.CoapSignUpResponse {})
247
258
c .resourcesPublished = false
259
+ c .forceRefreshToken = false
260
+ c .reconnect .Store (false )
248
261
if err := c .close (); err != nil {
249
262
c .logger .Warnf ("cannot close connection: %w" , err )
250
263
}
251
264
c .save ()
252
265
c .removePreviousCloudIDs ()
266
+ c .logger .Infof ("reset credentials" )
253
267
}
254
268
255
269
func (c * Manager ) cleanup () {
@@ -347,29 +361,40 @@ func validUntil(expiresIn int64) time.Time {
347
361
}
348
362
349
363
func (c * Manager ) setCreds (creds ocfCloud.CoapSignUpResponse ) {
350
- c .creds = creds
364
+ c .private .mutex .Lock ()
365
+ defer c .private .mutex .Unlock ()
366
+ c .private .creds = creds
351
367
c .signedIn = false
352
368
}
353
369
370
+ func (c * Manager ) updateCreds (f func (creds * ocfCloud.CoapSignUpResponse )) {
371
+ c .private .mutex .Lock ()
372
+ defer c .private .mutex .Unlock ()
373
+ f (& c .private .creds )
374
+ }
375
+
354
376
func (c * Manager ) getCreds () ocfCloud.CoapSignUpResponse {
355
- return c .creds
377
+ c .private .mutex .Lock ()
378
+ defer c .private .mutex .Unlock ()
379
+ return c .private .creds
356
380
}
357
381
358
382
func (c * Manager ) isCredsExpiring () bool {
359
- if c .creds .ValidUntil .IsZero () {
383
+ creds := c .getCreds ()
384
+ if creds .ValidUntil .IsZero () {
360
385
return false
361
386
}
362
- diff := time .Until (c . creds .ValidUntil )
363
- if diff < tickInterval * 2 {
387
+ diff := time .Until (creds .ValidUntil )
388
+ if diff < c . tickInterval * 2 {
364
389
// refresh token before it expires
365
390
return true
366
391
}
367
392
// refresh token when it is 1/3 before it expires
368
- return time .Now ().After (c . creds .ValidUntil .Add (- diff / 3 ))
393
+ return time .Now ().After (creds .ValidUntil .Add (- diff / 3 ))
369
394
}
370
395
371
- func getResourceTypesFilter (request * mux. Message ) []string {
372
- queries , _ := request . Options () .Queries ()
396
+ func getResourceTypesFilter (messageOptions message. Options ) []string {
397
+ queries , _ := messageOptions .Queries ()
373
398
resourceTypesFitler := []string {}
374
399
for _ , q := range queries {
375
400
if len (q ) > 3 && q [:3 ] == "rt=" {
@@ -379,37 +404,64 @@ func getResourceTypesFilter(request *mux.Message) []string {
379
404
return resourceTypesFitler
380
405
}
381
406
382
- func ( c * Manager ) serveCOAP ( w mux. ResponseWriter , request * mux.Message ) {
383
- request .Message . AddQuery ( "di=" + c . deviceID . String ())
384
- r := net. Request {
385
- Message : request . Message ,
386
- Endpoints : nil ,
387
- Conn : w . Conn (),
407
+ func inFilterSupportedCodes ( request * mux.Message ) bool {
408
+ switch request .Code () {
409
+ case codes . POST , codes . PUT , codes . DELETE , codes . GET :
410
+ return true
411
+ default :
412
+ return false
388
413
}
389
- var resp * pool.Message
414
+ }
415
+
416
+ func (c * Manager ) handleDeviceResource (r * net.Request ) (* pool.Message , error ) {
417
+ links := c .getLinks (schema.Endpoints {}, c .deviceID , nil , resources .PublishToCloud )
418
+ for _ , link := range links {
419
+ if link .HasType (device .ResourceType ) {
420
+ _ = r .SetPath (link .Href )
421
+ break
422
+ }
423
+ }
424
+ return c .handler (r )
425
+ }
426
+
427
+ func (c * Manager ) handleDiscoveryResource (r * net.Request ) (* pool.Message , error ) {
428
+ links := c .getLinks (schema.Endpoints {}, c .deviceID , getResourceTypesFilter (r .Message .Options ()), resources .PublishToCloud )
429
+ links = patchDeviceLink (links )
430
+ links = discovery .PatchLinks (links , c .deviceID .String ())
431
+ return resources .CreateResponseContent (r .Context (), links , codes .Content )
432
+ }
433
+
434
+ func (c * Manager ) getHandler (r * net.Request ) func (r * net.Request ) (* pool.Message , error ) {
435
+ h := c .handler
390
436
p , err := r .Path ()
391
437
if err == nil {
392
438
switch p {
393
439
case device .ResourceURI :
394
- links := c .getLinks (schema.Endpoints {}, c .deviceID , nil , resources .PublishToCloud )
395
- for _ , link := range links {
396
- if link .HasType (device .ResourceType ) {
397
- _ = r .SetPath (link .Href )
398
- break
399
- }
400
- }
401
- resp , err = c .handler (& r )
440
+ h = c .handleDeviceResource
402
441
case plgdResources .ResourceURI :
403
- links := c .getLinks (schema.Endpoints {}, c .deviceID , getResourceTypesFilter (request ), resources .PublishToCloud )
404
- links = patchDeviceLink (links )
405
- links = discovery .PatchLinks (links , c .deviceID .String ())
406
- resp , err = resources .CreateResponseContent (request .Context (), links , codes .Content )
407
- default :
408
- resp , err = c .handler (& r )
442
+ h = c .handleDiscoveryResource
409
443
}
410
- } else {
411
- resp , err = c .handler (& r )
412
444
}
445
+ return h
446
+ }
447
+
448
+ func (c * Manager ) serveCOAP (w mux.ResponseWriter , request * mux.Message ) {
449
+ if ! inFilterSupportedCodes (request ) {
450
+ // ignore unsupported request
451
+ if w .Conn ().Context ().Err () == nil {
452
+ // log only if connection is still open
453
+ c .logger .Debugf ("unsupported request: %v\n " , request )
454
+ }
455
+ return
456
+ }
457
+ request .Message .AddQuery ("di=" + c .deviceID .String ())
458
+ r := net.Request {
459
+ Message : request .Message ,
460
+ Endpoints : nil ,
461
+ Conn : w .Conn (),
462
+ }
463
+ h := c .getHandler (& r )
464
+ resp , err := h (& r )
413
465
if err != nil {
414
466
resp = net .CreateResponseError (request .Context (), err , request .Token ())
415
467
}
@@ -502,8 +554,9 @@ func patchDeviceLink(links schema.ResourceLinks) schema.ResourceLinks {
502
554
503
555
func (c * Manager ) connect (ctx context.Context ) error {
504
556
funcs := make ([]func (ctx context.Context ) error , 0 , 5 )
505
- if c .isCredsExpiring () {
557
+ if c .isCredsExpiring () || c . forceRefreshToken {
506
558
funcs = append (funcs , c .refreshToken )
559
+ c .forceRefreshToken = false
507
560
}
508
561
funcs = append (funcs , []func (ctx context.Context ) error {
509
562
c .signUp ,
@@ -517,7 +570,7 @@ func (c *Manager) connect(ctx context.Context) error {
517
570
}
518
571
for _ , f := range funcs {
519
572
r := func (ctx context.Context ) error {
520
- fctx , cancel := context .WithTimeout (ctx , time . Second * 10 )
573
+ fctx , cancel := context .WithTimeout (ctx , c . tickInterval )
521
574
defer cancel ()
522
575
return f (fctx )
523
576
}
@@ -584,6 +637,11 @@ func (c *Manager) popReadyToPublishResources() map[string]struct{} {
584
637
return res
585
638
}
586
639
640
+ func (c * Manager ) Reconnect () {
641
+ c .reconnect .Store (true )
642
+ c .triggerRunner (false )
643
+ }
644
+
587
645
func (c * Manager ) popReadyToUnpublishResources (count int ) []string {
588
646
c .private .mutex .Lock ()
589
647
defer c .private .mutex .Unlock ()
0 commit comments