Skip to content

Commit ac50113

Browse files
committed
Implement eventloop to reduce number of goroutines in bridge device
1 parent defcd1e commit ac50113

File tree

10 files changed

+415
-80
lines changed

10 files changed

+415
-80
lines changed

bridge/device/cloud/manager.go

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"crypto/tls"
2424
"crypto/x509"
2525
"fmt"
26+
"reflect"
2627
"sync"
2728
"time"
2829

@@ -31,6 +32,7 @@ import (
3132
"github.com/plgd-dev/device/v2/bridge/resources"
3233
"github.com/plgd-dev/device/v2/bridge/resources/discovery"
3334
"github.com/plgd-dev/device/v2/pkg/codec/cbor"
35+
"github.com/plgd-dev/device/v2/pkg/eventloop"
3436
"github.com/plgd-dev/device/v2/pkg/log"
3537
"github.com/plgd-dev/device/v2/pkg/net/coap"
3638
ocfCloud "github.com/plgd-dev/device/v2/pkg/ocf/cloud"
@@ -91,9 +93,10 @@ type Manager struct {
9193
resourcesPublished bool
9294
done chan struct{}
9395
trigger chan bool
96+
loop *eventloop.Loop
9497
}
9598

96-
func New(cfg Config, deviceID uuid.UUID, save func(), handler net.RequestHandler, getLinks GetLinksFilteredBy, caPool CAPoolGetter, opts ...Option) (*Manager, error) {
99+
func New(cfg Config, deviceID uuid.UUID, save func(), handler net.RequestHandler, getLinks GetLinksFilteredBy, caPool CAPoolGetter, loop *eventloop.Loop, opts ...Option) (*Manager, error) {
97100
if !caPool.IsValid() {
98101
return nil, fmt.Errorf("invalid ca pool")
99102
}
@@ -123,6 +126,7 @@ func New(cfg Config, deviceID uuid.UUID, save func(), handler net.RequestHandler
123126
getCertificates: o.getCertificates,
124127
removeCloudCAs: o.removeCloudCAs,
125128
logger: o.logger,
129+
loop: loop,
126130
}
127131
c.private.cfg.ProvisioningStatus = cloud.ProvisioningStatus_UNINITIALIZED
128132
c.importConfig(cfg)
@@ -162,11 +166,55 @@ func (c *Manager) importConfig(cfg Config) {
162166
})
163167
}
164168

169+
func (c *Manager) handleTrigger(value reflect.Value, closed bool) {
170+
if closed {
171+
return
172+
}
173+
ctx := context.Background()
174+
wantToReset := value.Bool()
175+
if wantToReset {
176+
c.resetCredentials(ctx, true)
177+
}
178+
if c.getCloudConfiguration().URL == "" {
179+
return
180+
}
181+
if err := c.connect(ctx); err != nil {
182+
c.logger.Errorf("cannot connect to cloud: %w", err)
183+
} else {
184+
c.setProvisioningStatus(cloud.ProvisioningStatus_REGISTERED)
185+
}
186+
}
187+
188+
func (c *Manager) handleTimer(_ reflect.Value, closed bool) {
189+
if closed {
190+
return
191+
}
192+
if c.getCloudConfiguration().URL == "" {
193+
return
194+
}
195+
if err := c.connect(context.Background()); err != nil {
196+
c.logger.Errorf("cannot connect to cloud: %w", err)
197+
} else {
198+
c.setProvisioningStatus(cloud.ProvisioningStatus_REGISTERED)
199+
}
200+
}
201+
165202
func (c *Manager) Init() {
166203
if c.private.cfg.URL != "" {
167204
c.triggerRunner(false)
168205
}
169-
go c.run()
206+
t := time.NewTicker(time.Second * 10)
207+
handlers := []eventloop.Handler{
208+
eventloop.NewReadHandler(reflect.ValueOf(c.trigger), c.handleTrigger),
209+
eventloop.NewReadHandler(reflect.ValueOf(t.C), c.handleTimer),
210+
eventloop.NewReadHandler(reflect.ValueOf(c.done), func(_ reflect.Value, _ bool) {
211+
_ = c.close()
212+
// cleanup resources
213+
c.loop.RemoveByChannels(reflect.ValueOf(c.done), reflect.ValueOf(t.C), reflect.ValueOf(c.trigger))
214+
t.Stop()
215+
}),
216+
}
217+
c.loop.Add(handlers...)
170218
}
171219

172220
func (c *Manager) resetCredentials(ctx context.Context, signOff bool) {
@@ -371,6 +419,7 @@ func (c *Manager) dial(ctx context.Context) error {
371419
}
372420
tlsConfig := &tls.Config{
373421
InsecureSkipVerify: true, //nolint:gosec
422+
MinVersion: tls.VersionTLS12,
374423
Certificates: c.getCertificates(c.deviceID.String()),
375424
VerifyPeerCertificate: coap.NewVerifyPeerCertificate(caPool, func(cert *x509.Certificate) error {
376425
cloudID, errP := uuid.Parse(c.getCloudConfiguration().CloudID)
@@ -426,34 +475,6 @@ func patchDeviceLink(links schema.ResourceLinks) schema.ResourceLinks {
426475
return links
427476
}
428477

429-
func (c *Manager) run() {
430-
ctx := context.Background()
431-
defer func() {
432-
if err := c.close(); err != nil {
433-
c.logger.Warnf("cannot close connection: %w", err)
434-
}
435-
}()
436-
t := time.NewTicker(time.Second * 10)
437-
for {
438-
select {
439-
case <-c.done:
440-
return
441-
case wantToReset := <-c.trigger:
442-
if wantToReset {
443-
c.resetCredentials(ctx, true)
444-
}
445-
case <-t.C:
446-
}
447-
if c.getCloudConfiguration().URL != "" {
448-
if err := c.connect(ctx); err != nil {
449-
c.logger.Errorf("cannot connect to cloud: %w", err)
450-
} else {
451-
c.setProvisioningStatus(cloud.ProvisioningStatus_REGISTERED)
452-
}
453-
}
454-
}
455-
}
456-
457478
func (c *Manager) connect(ctx context.Context) error {
458479
var funcs []func(ctx context.Context) error
459480
if c.isCredsExpiring() {

bridge/device/device.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/plgd-dev/device/v2/bridge/resources/discovery"
3535
"github.com/plgd-dev/device/v2/bridge/resources/maintenance"
3636
credentialResource "github.com/plgd-dev/device/v2/bridge/resources/secure/credential"
37+
"github.com/plgd-dev/device/v2/pkg/eventloop"
3738
pkgLog "github.com/plgd-dev/device/v2/pkg/log"
3839
"github.com/plgd-dev/device/v2/schema"
3940
cloudSchema "github.com/plgd-dev/device/v2/schema/cloud"
@@ -55,7 +56,7 @@ type Resource interface {
5556
GetResourceInterfaces() []string
5657
HandleRequest(req *net.Request) (*pool.Message, error)
5758
GetPolicyBitMask() schema.BitMask
58-
SetObserveHandler(createSubscription resources.CreateSubscriptionFunc)
59+
SetObserveHandler(loop *eventloop.Loop, createSubscription resources.CreateSubscriptionFunc)
5960
UpdateETag()
6061
}
6162

@@ -65,6 +66,9 @@ type Device struct {
6566
cloudManager *cloud.Manager
6667
credentialManager *credential.Manager
6768
onDeviceUpdated func(d *Device)
69+
loop *eventloop.Loop
70+
runLoop bool
71+
done chan struct{}
6872
}
6973

7074
func NewLogger(id uuid.UUID, level pkgLog.Level) pkgLog.Logger {
@@ -88,6 +92,10 @@ func (d *Device) GetResourceTypes() []string {
8892
return d.cfg.ResourceTypes
8993
}
9094

95+
func (d *Device) GetLoop() *eventloop.Loop {
96+
return d.loop
97+
}
98+
9199
func (d *Device) GetProtocolIndependentID() uuid.UUID {
92100
return d.cfg.ProtocolIndependentID
93101
}
@@ -115,6 +123,8 @@ func New(cfg Config, opts ...Option) (*Device, error) {
115123
getAdditionalProperties: func() map[string]interface{} { return nil },
116124
caPool: cloud.MakeCAPool(nil, false),
117125
logger: NewLogger(cfg.ID, pkgLog.LevelInfo),
126+
loop: eventloop.New(),
127+
runLoop: true,
118128
}
119129
for _, opt := range opts {
120130
opt(&o)
@@ -125,6 +135,11 @@ func New(cfg Config, opts ...Option) (*Device, error) {
125135
cfg: cfg,
126136
resources: sync.NewMap[string, Resource](),
127137
onDeviceUpdated: o.onDeviceUpdated,
138+
loop: o.loop,
139+
runLoop: o.runLoop,
140+
}
141+
if o.runLoop {
142+
d.done = make(chan struct{})
128143
}
129144

130145
cloudOpts := []cloud.Option{
@@ -145,7 +160,7 @@ func New(cfg Config, opts ...Option) (*Device, error) {
145160
}
146161
cm, err := cloud.New(cfg.Cloud.Config, d.cfg.ID, func() {
147162
d.onDeviceUpdated(d)
148-
}, d.HandleRequest, d.GetLinksFilteredBy, o.caPool, cloudOpts...)
163+
}, d.HandleRequest, d.GetLinksFilteredBy, o.caPool, o.loop, cloudOpts...)
149164
if err != nil {
150165
return nil, fmt.Errorf("cannot create cloud manager: %w", err)
151166
}
@@ -176,6 +191,9 @@ func (d *Device) Init() {
176191
if d.cloudManager != nil {
177192
d.cloudManager.Init()
178193
}
194+
if d.runLoop {
195+
go d.loop.Run(d.done)
196+
}
179197
}
180198

181199
func (d *Device) GetCloudManager() *cloud.Manager {
@@ -272,4 +290,10 @@ func (d *Device) Close() {
272290
for _, resource := range d.resources.LoadAndDeleteAll() {
273291
resource.Close()
274292
}
293+
if d.runLoop {
294+
select {
295+
case d.done <- struct{}{}:
296+
default:
297+
}
298+
}
275299
}

bridge/device/options.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/plgd-dev/device/v2/bridge/device/cloud"
2525
"github.com/plgd-dev/device/v2/bridge/resources/device"
26+
"github.com/plgd-dev/device/v2/pkg/eventloop"
2627
"github.com/plgd-dev/device/v2/pkg/log"
2728
)
2829

@@ -39,6 +40,8 @@ type OptionsCfg struct {
3940
getCertificates cloud.GetCertificates
4041
caPool CAPoolGetter
4142
logger log.Logger
43+
loop *eventloop.Loop
44+
runLoop bool
4245
}
4346

4447
type Option func(*OptionsCfg)
@@ -72,3 +75,10 @@ func WithLogger(logger log.Logger) Option {
7275
o.logger = logger
7376
}
7477
}
78+
79+
func WithEventLoop(loop *eventloop.Loop) Option {
80+
return func(o *OptionsCfg) {
81+
o.loop = loop
82+
o.runLoop = false
83+
}
84+
}

bridge/observeResource_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestObserveResource(t *testing.T) {
6767
rds.setName(newData.Name)
6868
return resHandler(req)
6969
}, []string{"oic.d.virtual", "oic.d.test"}, []string{interfaces.OC_IF_BASELINE, interfaces.OC_IF_RW})
70-
res.SetObserveHandler(func(req *net.Request, handler func(msg *pool.Message, err error)) (cancel func(), err error) {
70+
res.SetObserveHandler(d.GetLoop(), func(req *net.Request, handler func(msg *pool.Message, err error)) (cancel func(), err error) {
7171
ctx, cancel := context.WithCancel(context.Background())
7272
go func() {
7373
defer cancel()

0 commit comments

Comments
 (0)