Skip to content

Commit 6495d05

Browse files
committed
terminal+queue: add concurrent error queue
This commit adds a generic type queue which is basically an implementation of a channel with an infinite buffer, so it should never block when sending to it.
1 parent e275726 commit 6495d05

File tree

2 files changed

+140
-0
lines changed

2 files changed

+140
-0
lines changed

queue/queue.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package queue
2+
3+
import (
4+
"container/list"
5+
"sync"
6+
)
7+
8+
const (
9+
// DefaultQueueSize is the default size to use for concurrent queues.
10+
DefaultQueueSize = 10
11+
)
12+
13+
// ConcurrentQueue is a typed concurrent-safe FIFO queue with unbounded
14+
// capacity. Clients interact with the queue by pushing items into the in
15+
// channel and popping items from the out channel. There is a goroutine that
16+
// manages moving items from the in channel to the out channel in the correct
17+
// order that must be started by calling Start().
18+
type ConcurrentQueue[T any] struct {
19+
started sync.Once
20+
stopped sync.Once
21+
22+
chanIn chan T
23+
chanOut chan T
24+
overflow *list.List
25+
26+
wg sync.WaitGroup
27+
quit chan struct{}
28+
}
29+
30+
// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
31+
// the capacity of the output channel. When the size of the queue is below this
32+
// threshold, pushes do not incur the overhead of the less efficient overflow
33+
// structure.
34+
func NewConcurrentQueue[T any](bufferSize int) *ConcurrentQueue[T] {
35+
return &ConcurrentQueue[T]{
36+
chanIn: make(chan T),
37+
chanOut: make(chan T, bufferSize),
38+
overflow: list.New(),
39+
quit: make(chan struct{}),
40+
}
41+
}
42+
43+
// ChanIn returns a channel that can be used to push new items into the queue.
44+
func (cq *ConcurrentQueue[T]) ChanIn() chan<- T {
45+
return cq.chanIn
46+
}
47+
48+
// ChanOut returns a channel that can be used to pop items from the queue.
49+
func (cq *ConcurrentQueue[T]) ChanOut() <-chan T {
50+
return cq.chanOut
51+
}
52+
53+
// Start begins a goroutine that manages moving items from the in channel to the
54+
// out channel. The queue tries to move items directly to the out channel
55+
// minimize overhead, but if the out channel is full it pushes items to an
56+
// overflow queue. This must be called before using the queue.
57+
func (cq *ConcurrentQueue[T]) Start() {
58+
cq.started.Do(cq.start)
59+
}
60+
61+
func (cq *ConcurrentQueue[T]) start() {
62+
cq.wg.Add(1)
63+
go func() {
64+
defer cq.wg.Done()
65+
66+
readLoop:
67+
for {
68+
nextElement := cq.overflow.Front()
69+
if nextElement == nil {
70+
// Overflow queue is empty so incoming items can
71+
// be pushed directly to the output channel. If
72+
// output channel is full though, push to
73+
// overflow.
74+
select {
75+
case item, ok := <-cq.chanIn:
76+
if !ok {
77+
break readLoop
78+
}
79+
select {
80+
case cq.chanOut <- item:
81+
// Optimistically push directly
82+
// to chanOut.
83+
default:
84+
cq.overflow.PushBack(item)
85+
}
86+
case <-cq.quit:
87+
return
88+
}
89+
} else {
90+
// Overflow queue is not empty, so any new items
91+
// get pushed to the back to preserve order.
92+
select {
93+
case item, ok := <-cq.chanIn:
94+
if !ok {
95+
break readLoop
96+
}
97+
cq.overflow.PushBack(item)
98+
case cq.chanOut <- nextElement.Value.(T):
99+
cq.overflow.Remove(nextElement)
100+
case <-cq.quit:
101+
return
102+
}
103+
}
104+
}
105+
106+
// Incoming channel has been closed. Empty overflow queue into
107+
// the outgoing channel.
108+
nextElement := cq.overflow.Front()
109+
for nextElement != nil {
110+
select {
111+
case cq.chanOut <- nextElement.Value.(T):
112+
cq.overflow.Remove(nextElement)
113+
case <-cq.quit:
114+
return
115+
}
116+
nextElement = cq.overflow.Front()
117+
}
118+
119+
// Close outgoing channel.
120+
close(cq.chanOut)
121+
}()
122+
}
123+
124+
// Stop ends the goroutine that moves items from the in channel to the out
125+
// channel. This does not clear the queue state, so the queue can be restarted
126+
// without dropping items.
127+
func (cq *ConcurrentQueue[T]) Stop() {
128+
cq.stopped.Do(func() {
129+
close(cq.quit)
130+
cq.wg.Wait()
131+
})
132+
}

terminal.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/lightninglabs/faraday/frdrpc"
2323
"github.com/lightninglabs/faraday/frdrpcserver"
2424
"github.com/lightninglabs/lightning-terminal/litrpc"
25+
"github.com/lightninglabs/lightning-terminal/queue"
2526
mid "github.com/lightninglabs/lightning-terminal/rpcmiddleware"
2627
"github.com/lightninglabs/lightning-terminal/session"
2728
"github.com/lightninglabs/lndclient"
@@ -198,6 +199,13 @@ func (g *LightningTerminal) Run() error {
198199
// Show version at startup.
199200
log.Infof("LiT version: %s", Version())
200201

202+
// This concurrent error queue can be used by every component that can
203+
// raise runtime errors. Using a queue will prevent us from blocking on
204+
// sending errors to it, as long as the queue is running.
205+
errQueue := queue.NewConcurrentQueue[error](queue.DefaultQueueSize)
206+
errQueue.Start()
207+
defer errQueue.Stop()
208+
201209
// Construct a new PermissionsManager.
202210
g.permsMgr, err = NewPermissionsManager()
203211
if err != nil {

0 commit comments

Comments
 (0)