Skip to content

Commit 13247c4

Browse files
committed
staticaddr: wait for address manager initiation
1 parent 59f67de commit 13247c4

File tree

7 files changed

+105
-69
lines changed

7 files changed

+105
-69
lines changed

loopd/daemon.go

Lines changed: 88 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
774774
infof("Liquidity manager stopped")
775775
}()
776776

777+
initManagerTimeout := 10 * time.Second
778+
777779
// Start the reservation manager.
778780
if d.reservationManager != nil {
779781
d.wg.Add(1)
@@ -792,9 +794,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
792794
}
793795
}()
794796

795-
// Wait for the reservation server to be ready before starting the
796-
// grpc server.
797-
timeOutCtx, cancel := context.WithTimeout(d.mainCtx, 10*time.Second)
797+
// Wait for the reservation server to be ready before starting
798+
// the grpc server.
799+
timeOutCtx, cancel := context.WithTimeout(
800+
d.mainCtx, initManagerTimeout,
801+
)
798802
select {
799803
case <-timeOutCtx.Done():
800804
cancel()
@@ -822,9 +826,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
822826
}
823827
}()
824828

825-
// Wait for the instantout server to be ready before starting the
826-
// grpc server.
827-
timeOutCtx, cancel := context.WithTimeout(d.mainCtx, 10*time.Second)
829+
// Wait for the instantout server to be ready before starting
830+
// the grpc server.
831+
timeOutCtx, cancel := context.WithTimeout(
832+
d.mainCtx, initManagerTimeout,
833+
)
828834
select {
829835
case <-timeOutCtx.Done():
830836
cancel()
@@ -839,67 +845,128 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
839845
// Start the static address manager.
840846
if staticAddressManager != nil {
841847
d.wg.Add(1)
848+
initChan := make(chan struct{})
842849
go func() {
843850
defer d.wg.Done()
844851

845852
infof("Starting static address manager...")
846-
err := staticAddressManager.Run(d.mainCtx)
853+
defer infof("Static address manager stopped")
854+
855+
err := staticAddressManager.Run(d.mainCtx, initChan)
847856
if err != nil && !errors.Is(context.Canceled, err) {
848857
d.internalErrChan <- err
849858
}
850-
infof("Static address manager stopped")
851859
}()
860+
861+
// Wait for the static address manager to be ready before
862+
// starting the grpc server.
863+
timeOutCtx, cancel := context.WithTimeout(
864+
d.mainCtx, initManagerTimeout,
865+
)
866+
select {
867+
case <-timeOutCtx.Done():
868+
cancel()
869+
return fmt.Errorf("static address manager not "+
870+
"ready: %v", timeOutCtx.Err())
871+
872+
case <-initChan:
873+
cancel()
874+
}
852875
}
853876

854877
// Start the static address deposit manager.
855878
if depositManager != nil {
856879
d.wg.Add(1)
880+
initChan := make(chan struct{})
857881
go func() {
858882
defer d.wg.Done()
859883

860884
infof("Starting static address deposit manager...")
861-
err := depositManager.Run(d.mainCtx)
885+
defer infof("Static address deposit manager stopped")
886+
887+
err := depositManager.Run(d.mainCtx, initChan)
862888
if err != nil && !errors.Is(context.Canceled, err) {
863889
d.internalErrChan <- err
864890
}
865-
infof("Static address deposit manager stopped")
866891
}()
867-
depositManager.WaitInitComplete()
892+
893+
// Wait for the static address manager to be ready before
894+
// starting the grpc server.
895+
timeOutCtx, cancel := context.WithTimeout(
896+
d.mainCtx, initManagerTimeout,
897+
)
898+
select {
899+
case <-timeOutCtx.Done():
900+
cancel()
901+
return fmt.Errorf("static address deposit manager "+
902+
"not ready: %v", timeOutCtx.Err())
903+
904+
case <-initChan:
905+
cancel()
906+
}
868907
}
869908

870909
// Start the static address deposit withdrawal manager.
871910
if withdrawalManager != nil {
872911
d.wg.Add(1)
912+
initChan := make(chan struct{})
873913
go func() {
874914
defer d.wg.Done()
875915

876-
infof("Starting static address deposit withdrawal " +
877-
"manager...")
878-
err := withdrawalManager.Run(d.mainCtx)
916+
infof("Starting static address withdrawal manager...")
917+
defer infof("Static address withdrawal manager stopped")
918+
919+
err := withdrawalManager.Run(d.mainCtx, initChan)
879920
if err != nil && !errors.Is(context.Canceled, err) {
880921
d.internalErrChan <- err
881922
}
882-
infof("Static address deposit withdrawal manager " +
883-
"stopped")
884923
}()
885-
withdrawalManager.WaitInitComplete()
924+
925+
// Wait for the static address withdrawal manager to be ready
926+
// before starting the grpc server.
927+
timeOutCtx, cancel := context.WithTimeout(
928+
d.mainCtx, initManagerTimeout,
929+
)
930+
select {
931+
case <-timeOutCtx.Done():
932+
cancel()
933+
return fmt.Errorf("static address withdrawal manager "+
934+
"server not ready: %v", timeOutCtx.Err())
935+
936+
case <-initChan:
937+
cancel()
938+
}
886939
}
887940

888941
// Start the static address loop-in manager.
889942
if staticLoopInManager != nil {
890943
d.wg.Add(1)
944+
initChan := make(chan struct{})
891945
go func() {
892946
defer d.wg.Done()
893947

894948
infof("Starting static address loop-in manager...")
895-
err := staticLoopInManager.Run(d.mainCtx)
949+
defer infof("Static address loop-in manager stopped")
950+
err := staticLoopInManager.Run(d.mainCtx, initChan)
896951
if err != nil && !errors.Is(context.Canceled, err) {
897952
d.internalErrChan <- err
898953
}
899-
infof("Starting static address loop-in manager " +
900-
"stopped")
901954
}()
902-
staticLoopInManager.WaitInitComplete()
955+
956+
// Wait for the static address loop-in manager to be ready before
957+
// starting the grpc server.
958+
timeOutCtx, cancel := context.WithTimeout(
959+
d.mainCtx, initManagerTimeout,
960+
)
961+
select {
962+
case <-timeOutCtx.Done():
963+
cancel()
964+
return fmt.Errorf("static address loop-in manager "+
965+
"not ready: %v", timeOutCtx.Err())
966+
967+
case <-initChan:
968+
cancel()
969+
}
903970
}
904971

905972
// Last, start our internal error handler. This will return exactly one

staticaddr/address/manager.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,18 @@ func NewManager(cfg *ManagerConfig, currentHeight int32) *Manager {
6767
}
6868

6969
// Run runs the address manager.
70-
func (m *Manager) Run(ctx context.Context) error {
70+
func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
7171
newBlockChan, newBlockErrChan, err :=
7272
m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx)
7373

7474
if err != nil {
7575
return err
7676
}
7777

78+
// Communicate to the caller that the address manager has completed its
79+
// initialization.
80+
close(initChan)
81+
7882
for {
7983
select {
8084
case currentHeight := <-newBlockChan:

staticaddr/address/manager_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,14 @@ func TestManager(t *testing.T) {
9595
testContext := NewAddressManagerTestContext(t)
9696

9797
// Start the manager.
98+
initChan := make(chan struct{})
9899
go func() {
99-
err := testContext.manager.Run(ctxb)
100+
err := testContext.manager.Run(ctxb, initChan)
100101
require.ErrorIs(t, err, context.Canceled)
101102
}()
102103

104+
<-initChan
105+
103106
// Create the expected static address.
104107
expectedAddress, err := GenerateExpectedTaprootAddress(testContext)
105108
require.NoError(t, err)

staticaddr/deposit/manager.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,6 @@ type Manager struct {
7777
// mu guards access to activeDeposits map.
7878
mu sync.Mutex
7979

80-
// initChan signals the daemon that the address manager has completed
81-
// its initialization.
82-
initChan chan struct{}
83-
8480
// activeDeposits contains all the active static address outputs.
8581
activeDeposits map[wire.OutPoint]*FSM
8682

@@ -100,15 +96,14 @@ type Manager struct {
10096
func NewManager(cfg *ManagerConfig) *Manager {
10197
return &Manager{
10298
cfg: cfg,
103-
initChan: make(chan struct{}),
10499
activeDeposits: make(map[wire.OutPoint]*FSM),
105100
deposits: make(map[wire.OutPoint]*Deposit),
106101
finalizedDepositChan: make(chan wire.OutPoint),
107102
}
108103
}
109104

110105
// Run runs the address manager.
111-
func (m *Manager) Run(ctx context.Context) error {
106+
func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
112107
newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) //nolint:lll
113108
if err != nil {
114109
return err
@@ -125,7 +120,7 @@ func (m *Manager) Run(ctx context.Context) error {
125120

126121
// Communicate to the caller that the address manager has completed its
127122
// initialization.
128-
close(m.initChan)
123+
close(initChan)
129124

130125
for {
131126
select {
@@ -209,12 +204,6 @@ func (m *Manager) recoverDeposits(ctx context.Context) error {
209204
return nil
210205
}
211206

212-
// WaitInitComplete waits until the address manager has completed its setup.
213-
func (m *Manager) WaitInitComplete() {
214-
defer log.Debugf("Static address deposit manager initiation complete.")
215-
<-m.initChan
216-
}
217-
218207
// pollDeposits polls new deposits to our static address and notifies the
219208
// manager's event loop about them.
220209
func (m *Manager) pollDeposits(ctx context.Context) {

staticaddr/deposit/manager_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ var (
4242

4343
blockErrChan = make(chan error)
4444

45-
initChan = make(chan struct{})
46-
4745
finalizedDepositChan = make(chan wire.OutPoint)
4846
)
4947

@@ -218,8 +216,9 @@ func TestManager(t *testing.T) {
218216
testContext := newManagerTestContext(t)
219217

220218
// Start the deposit manager.
219+
initChan := make(chan struct{})
221220
go func() {
222-
require.NoError(t, testContext.manager.Run(ctx))
221+
require.NoError(t, testContext.manager.Run(ctx, initChan))
223222
}()
224223

225224
// Ensure that the manager has been initialized.
@@ -337,7 +336,6 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext {
337336
}
338337

339338
manager := NewManager(cfg)
340-
manager.initChan = initChan
341339
manager.finalizedDepositChan = finalizedDepositChan
342340

343341
testContext := &ManagerTestContext{

staticaddr/loopin/manager.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,6 @@ type newSwapResponse struct {
116116
type Manager struct {
117117
cfg *Config
118118

119-
// initChan signals the daemon that the address manager has completed
120-
// its initialization.
121-
initChan chan struct{}
122-
123119
// newLoopInChan receives swap requests from the server and initiates
124120
// loop-in swaps.
125121
newLoopInChan chan *newSwapRequest
@@ -141,7 +137,6 @@ type Manager struct {
141137
func NewManager(cfg *Config, currentHeight uint32) *Manager {
142138
m := &Manager{
143139
cfg: cfg,
144-
initChan: make(chan struct{}),
145140
newLoopInChan: make(chan *newSwapRequest),
146141
exitChan: make(chan struct{}),
147142
errChan: make(chan error),
@@ -153,7 +148,7 @@ func NewManager(cfg *Config, currentHeight uint32) *Manager {
153148
}
154149

155150
// Run runs the static address loop-in manager.
156-
func (m *Manager) Run(ctx context.Context) error {
151+
func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
157152
registerBlockNtfn := m.cfg.ChainNotifier.RegisterBlockEpochNtfn
158153
newBlockChan, newBlockErrChan, err := registerBlockNtfn(ctx)
159154
if err != nil {
@@ -175,7 +170,7 @@ func (m *Manager) Run(ctx context.Context) error {
175170

176171
// Communicate to the caller that the address manager has completed its
177172
// initialization.
178-
close(m.initChan)
173+
close(initChan)
179174

180175
var loopIn *StaticAddressLoopIn
181176
for {
@@ -493,13 +488,6 @@ func (m *Manager) recoverLoopIns(ctx context.Context) error {
493488
return nil
494489
}
495490

496-
// WaitInitComplete waits until the static address loop-in manager has completed
497-
// its setup.
498-
func (m *Manager) WaitInitComplete() {
499-
defer log.Debugf("Static address loop-in manager initiation complete.")
500-
<-m.initChan
501-
}
502-
503491
// DeliverLoopInRequest forwards a loop-in request from the server to the
504492
// manager run loop to initiate a new loop-in swap.
505493
func (m *Manager) DeliverLoopInRequest(ctx context.Context,

staticaddr/withdraw/manager.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,6 @@ type Manager struct {
113113
// mu protects access to finalizedWithdrawalTxns.
114114
mu sync.Mutex
115115

116-
// initChan signals the daemon that the withdrawal manager has completed
117-
// its initialization.
118-
initChan chan struct{}
119-
120116
// newWithdrawalRequestChan receives a list of outpoints that should be
121117
// withdrawn. The request is forwarded to the managers main loop.
122118
newWithdrawalRequestChan chan newWithdrawalRequest
@@ -139,7 +135,6 @@ type Manager struct {
139135
func NewManager(cfg *ManagerConfig, currentHeight uint32) *Manager {
140136
m := &Manager{
141137
cfg: cfg,
142-
initChan: make(chan struct{}),
143138
finalizedWithdrawalTxns: make(map[chainhash.Hash]*wire.MsgTx),
144139
exitChan: make(chan struct{}),
145140
newWithdrawalRequestChan: make(chan newWithdrawalRequest),
@@ -151,7 +146,7 @@ func NewManager(cfg *ManagerConfig, currentHeight uint32) *Manager {
151146
}
152147

153148
// Run runs the deposit withdrawal manager.
154-
func (m *Manager) Run(ctx context.Context) error {
149+
func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
155150
newBlockChan, newBlockErrChan, err :=
156151
m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx)
157152

@@ -166,7 +161,7 @@ func (m *Manager) Run(ctx context.Context) error {
166161

167162
// Communicate to the caller that the address manager has completed its
168163
// initialization.
169-
close(m.initChan)
164+
close(initChan)
170165

171166
for {
172167
select {
@@ -274,14 +269,6 @@ func (m *Manager) recoverWithdrawals(ctx context.Context) error {
274269
return nil
275270
}
276271

277-
// WaitInitComplete waits until the address manager has completed its setup.
278-
func (m *Manager) WaitInitComplete() {
279-
defer log.Debugf("Static address withdrawal manager initiation " +
280-
"complete.")
281-
282-
<-m.initChan
283-
}
284-
285272
// WithdrawDeposits starts a deposits withdrawal flow. If the amount is set to 0
286273
// the full amount of the selected deposits will be withdrawn.
287274
func (m *Manager) WithdrawDeposits(ctx context.Context,

0 commit comments

Comments
 (0)