Skip to content

Commit f8f60c2

Browse files
committed
terminal+rpcmiddleware: add main err queue
1 parent 6495d05 commit f8f60c2

File tree

2 files changed

+49
-37
lines changed

2 files changed

+49
-37
lines changed

rpcmiddleware/manager.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,23 @@ type Manager struct {
1414
lndClient lndclient.LightningClient
1515
interceptors []RequestInterceptor
1616

17-
errChan chan error
18-
wg sync.WaitGroup
19-
cancel context.CancelFunc
20-
quit chan struct{}
21-
stopOnce sync.Once
17+
mainErrChan chan<- error
18+
wg sync.WaitGroup
19+
cancel context.CancelFunc
20+
quit chan struct{}
21+
stopOnce sync.Once
2222
}
2323

2424
// NewManager returns a new middleware manager.
2525
func NewManager(interceptTimeout time.Duration,
26-
lndClient lndclient.LightningClient,
26+
lndClient lndclient.LightningClient, errChan chan<- error,
2727
interceptors ...RequestInterceptor) *Manager {
2828

2929
return &Manager{
3030
interceptTimeout: interceptTimeout,
3131
lndClient: lndClient,
3232
interceptors: interceptors,
33-
errChan: make(chan error),
33+
mainErrChan: errChan,
3434
quit: make(chan struct{}),
3535
}
3636
}
@@ -74,7 +74,7 @@ func (f *Manager) Start() error {
7474
err)
7575

7676
select {
77-
case f.errChan <- err:
77+
case f.mainErrChan <- err:
7878
case <-f.quit:
7979
case <-ctxc.Done():
8080
}
@@ -88,12 +88,6 @@ func (f *Manager) Start() error {
8888
return nil
8989
}
9090

91-
// Errors returns a channel on which any errors the manager encounters will be
92-
// delivered.
93-
func (f *Manager) Errors() chan error {
94-
return f.errChan
95-
}
96-
9791
// Stop shuts down the middleware manager.
9892
func (f *Manager) Stop() {
9993
f.stopOnce.Do(func() {

terminal.go

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ type LightningTerminal struct {
142142
// guards all incoming calls. This is only set in integrated mode!
143143
lndInterceptorChain *rpcperms.InterceptorChain
144144

145-
wg sync.WaitGroup
146-
lndErrChan chan error
145+
wg sync.WaitGroup
146+
errQueue *queue.ConcurrentQueue[error]
147147

148148
lndClient *lndclient.GrpcLndServices
149149
basicClient lnrpc.LightningClient
@@ -175,9 +175,7 @@ type LightningTerminal struct {
175175

176176
// New creates a new instance of the lightning-terminal daemon.
177177
func New() *LightningTerminal {
178-
return &LightningTerminal{
179-
lndErrChan: make(chan error, 1),
180-
}
178+
return &LightningTerminal{}
181179
}
182180

183181
// Run starts everything and then blocks until either the application is shut
@@ -202,9 +200,9 @@ func (g *LightningTerminal) Run() error {
202200
// This concurrent error queue can be used by every component that can
203201
// raise runtime errors. Using a queue will prevent us from blocking on
204202
// sending errors to it, as long as the queue is running.
205-
errQueue := queue.NewConcurrentQueue[error](queue.DefaultQueueSize)
206-
errQueue.Start()
207-
defer errQueue.Stop()
203+
g.errQueue = queue.NewConcurrentQueue[error](queue.DefaultQueueSize)
204+
g.errQueue.Start()
205+
defer g.errQueue.Stop()
208206

209207
// Construct a new PermissionsManager.
210208
g.permsMgr, err = NewPermissionsManager()
@@ -268,6 +266,7 @@ func (g *LightningTerminal) Run() error {
268266
readyChan := make(chan struct{})
269267
bufReadyChan := make(chan struct{})
270268
unlockChan := make(chan struct{})
269+
lndQuit := make(chan struct{})
271270
macChan := make(chan []byte, 1)
272271

273272
if g.cfg.LndMode == ModeIntegrated {
@@ -304,11 +303,11 @@ func (g *LightningTerminal) Run() error {
304303
(!ok || e.Type != flags.ErrHelp) {
305304

306305
log.Errorf("Error running main lnd: %v", err)
307-
g.lndErrChan <- err
306+
g.errQueue.ChanIn() <- err
308307
return
309308
}
310309

311-
close(g.lndErrChan)
310+
close(lndQuit)
312311
}()
313312
} else {
314313
close(unlockChan)
@@ -337,9 +336,12 @@ func (g *LightningTerminal) Run() error {
337336
// get the ready signal immediately.
338337
case <-readyChan:
339338

340-
case err := <-g.lndErrChan:
339+
case err := <-g.errQueue.ChanOut():
341340
return err
342341

342+
case <-lndQuit:
343+
return nil
344+
343345
case <-shutdownInterceptor.ShutdownChannel():
344346
return errors.New("shutting down")
345347
}
@@ -375,9 +377,12 @@ func (g *LightningTerminal) Run() error {
375377
select {
376378
case <-readyChan:
377379

378-
case err := <-g.lndErrChan:
380+
case err := <-g.errQueue.ChanOut():
379381
return err
380382

383+
case <-lndQuit:
384+
return nil
385+
381386
case <-shutdownInterceptor.ShutdownChannel():
382387
return errors.New("shutting down")
383388
}
@@ -404,12 +409,15 @@ func (g *LightningTerminal) Run() error {
404409
log.Errorf("Received critical error from loop, shutting down: "+
405410
"%v", err)
406411

407-
case err := <-g.lndErrChan:
412+
case err := <-g.errQueue.ChanOut():
408413
if err != nil {
409-
log.Errorf("Received critical error from lnd, "+
414+
log.Errorf("Received critical error from subsystem, "+
410415
"shutting down: %v", err)
411416
}
412417

418+
case <-lndQuit:
419+
return nil
420+
413421
case <-shutdownInterceptor.ShutdownChannel():
414422
log.Infof("Shutdown signal received")
415423
}
@@ -607,7 +615,7 @@ func (g *LightningTerminal) startSubservers() error {
607615
// Start the middleware manager.
608616
g.middleware = mid.NewManager(
609617
g.cfg.RPCMiddleware.InterceptTimeout,
610-
g.lndClient.Client,
618+
g.lndClient.Client, g.errQueue.ChanIn(),
611619
)
612620

613621
if err = g.middleware.Start(); err != nil {
@@ -935,15 +943,25 @@ func (g *LightningTerminal) shutdown() error {
935943

936944
g.wg.Wait()
937945

938-
// The lnd error channel is only used if we are actually running lnd in
939-
// the same process.
940-
if g.cfg.LndMode == ModeIntegrated {
941-
err := <-g.lndErrChan
942-
if err != nil {
943-
log.Errorf("Error stopping lnd: %v", err)
944-
returnErr = err
946+
// Do we have any last errors to display? We use an anonymous function,
947+
// so we can use return instead of breaking to a label in the default
948+
// case.
949+
func() {
950+
for {
951+
select {
952+
case err := <-g.errQueue.ChanOut():
953+
if err != nil {
954+
log.Errorf("Error while stopping "+
955+
"litd: %v", err)
956+
957+
returnErr = err
958+
}
959+
960+
default:
961+
return
962+
}
945963
}
946-
}
964+
}()
947965

948966
return returnErr
949967
}

0 commit comments

Comments
 (0)