Skip to content

Commit ad49563

Browse files
committed
hyperloop: add hyperloop manager
1 parent 337fb3f commit ad49563

File tree

1 file changed

+250
-0
lines changed

1 file changed

+250
-0
lines changed

hyperloop/manager.go

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package hyperloop
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
"time"
8+
9+
"github.com/btcsuite/btcd/btcutil"
10+
"github.com/btcsuite/btcd/chaincfg"
11+
"github.com/lightninglabs/lndclient"
12+
"github.com/lightninglabs/loop/fsm"
13+
"github.com/lightninglabs/loop/swapserverrpc"
14+
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
15+
)
16+
17+
var (
18+
// defaultFSMObserverTimeout is the default timeout we'll wait for the
19+
// fsm to reach a certain state.
20+
defaultFSMObserverTimeout = time.Second * 15
21+
)
22+
23+
// newHyperloopRequest is a request to create a new hyperloop.
24+
type newHyperloopRequest struct {
25+
amt btcutil.Amount
26+
customSweepAddr string
27+
respChan chan *Hyperloop
28+
errChan chan error
29+
}
30+
31+
// Config contains all the services that the reservation FSM needs to operate.
32+
type Config struct {
33+
// Store is the store that is used to store the hyperloop.
34+
// TODO: implement the store.
35+
Store Store
36+
37+
// Wallet handles the key derivation for the reservation.
38+
Wallet lndclient.WalletKitClient
39+
40+
// ChainNotifier is used to subscribe to block notifications.
41+
ChainNotifier lndclient.ChainNotifierClient
42+
43+
// Signer is used to sign messages.
44+
Signer lndclient.SignerClient
45+
46+
// Router is used to pay the offchain payments.
47+
Router lndclient.RouterClient
48+
49+
// HyperloopClient is the client used to communicate with the
50+
// swap server.
51+
HyperloopClient swapserverrpc.HyperloopServerClient
52+
53+
// ChainParams are the params for the bitcoin network.
54+
ChainParams *chaincfg.Params
55+
}
56+
57+
type Manager struct {
58+
cfg *Config
59+
60+
pendingHyperloops map[ID][]*FSM
61+
hyperloopRequests chan *newHyperloopRequest
62+
63+
sync.Mutex
64+
}
65+
66+
// NewManager creates a new hyperloop manager.
67+
func NewManager(cfg *Config) *Manager {
68+
return &Manager{
69+
cfg: cfg,
70+
pendingHyperloops: make(map[ID][]*FSM),
71+
hyperloopRequests: make(chan *newHyperloopRequest),
72+
}
73+
}
74+
75+
// Run starts the hyperloop manager.
76+
func (m *Manager) Run(ctx context.Context, initialBlockHeight int32) error {
77+
runCtx, cancel := context.WithCancel(ctx)
78+
defer cancel()
79+
80+
// Subscribe to blockheight.
81+
blockChan, errChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(
82+
runCtx,
83+
)
84+
if err != nil {
85+
return err
86+
}
87+
88+
blockHeight := initialBlockHeight
89+
90+
for {
91+
select {
92+
case blockHeight = <-blockChan:
93+
94+
case req := <-m.hyperloopRequests:
95+
hyperloop, err := m.newHyperLoopOut(
96+
runCtx, req.amt, req.customSweepAddr,
97+
blockHeight,
98+
)
99+
if err != nil {
100+
log.Errorf("unable to create hyperloop: %v",
101+
err)
102+
req.errChan <- err
103+
} else {
104+
req.respChan <- hyperloop
105+
}
106+
107+
case err := <-errChan:
108+
log.Errorf("unable to get block height: %v", err)
109+
return err
110+
111+
case <-runCtx.Done():
112+
return nil
113+
}
114+
}
115+
}
116+
117+
// RequestNewHyperloop requests a new hyperloop. If we have a pending hyperloop,
118+
// we'll use the same id and sweep address in order to batch a possible sweep.
119+
func (m *Manager) RequestNewHyperloop(ctx context.Context, amt btcutil.Amount,
120+
customSweepAddr string) (*Hyperloop, error) {
121+
122+
req := &newHyperloopRequest{
123+
amt: amt,
124+
customSweepAddr: customSweepAddr,
125+
respChan: make(chan *Hyperloop),
126+
errChan: make(chan error),
127+
}
128+
129+
select {
130+
case <-ctx.Done():
131+
return nil, ctx.Err()
132+
case m.hyperloopRequests <- req:
133+
}
134+
135+
var (
136+
hyperloop *Hyperloop
137+
err error
138+
)
139+
140+
select {
141+
case <-ctx.Done():
142+
return nil, ctx.Err()
143+
144+
case hyperloop = <-req.respChan:
145+
146+
case err = <-req.errChan:
147+
}
148+
149+
if err != nil {
150+
return nil, err
151+
}
152+
153+
return hyperloop, err
154+
}
155+
156+
// newHyperLoopOut creates a new hyperloop out swap. If we have a pending
157+
// hyperloop, we'll use the same id and sweep address in order to batch
158+
// a possible sweep.
159+
func (m *Manager) newHyperLoopOut(ctx context.Context, amt btcutil.Amount,
160+
customSweepAddr string, blockheight int32) (*Hyperloop, error) {
161+
162+
var (
163+
sweepAddr btcutil.Address
164+
publishDeadline time.Time
165+
err error
166+
)
167+
168+
// For now we'll set the publish deadline to 30 minutes from now.
169+
publishDeadline = time.Now().Add(time.Minute * 30)
170+
171+
// Create a sweep pk script.
172+
if customSweepAddr == "" {
173+
sweepAddr, err = m.cfg.Wallet.NextAddr(
174+
ctx, "", walletrpc.AddressType_TAPROOT_PUBKEY,
175+
false,
176+
)
177+
if err != nil {
178+
return nil, err
179+
}
180+
} else {
181+
sweepAddr, err = btcutil.DecodeAddress(
182+
customSweepAddr, m.cfg.ChainParams,
183+
)
184+
if err != nil {
185+
return nil, err
186+
}
187+
}
188+
189+
req := &initHyperloopContext{
190+
swapAmt: amt,
191+
sweepAddr: sweepAddr,
192+
publishTime: publishDeadline,
193+
initiationHeight: blockheight,
194+
// We'll only do private hyperloops for now.
195+
private: true,
196+
}
197+
198+
// Create a new hyperloop fsm.
199+
hl, err := NewFSM(m.cfg, m)
200+
if err != nil {
201+
return nil, err
202+
}
203+
204+
go func() {
205+
err = hl.SendEvent(ctx, OnStart, req)
206+
if err != nil {
207+
log.Errorf("unable to send event to hyperloop fsm: %v",
208+
err)
209+
}
210+
}()
211+
212+
err = hl.DefaultObserver.WaitForState(
213+
ctx, defaultFSMObserverTimeout, WaitForPublish,
214+
fsm.WithAbortEarlyOnErrorOption(),
215+
)
216+
if err != nil {
217+
return nil, err
218+
}
219+
220+
m.Lock()
221+
m.pendingHyperloops[hl.hyperloop.ID] = append(
222+
m.pendingHyperloops[hl.hyperloop.ID], hl,
223+
)
224+
m.Unlock()
225+
226+
hyperloop := hl.GetVal()
227+
return hyperloop, nil
228+
}
229+
230+
// fetchHyperLoopTotalSweepAmt returns the total amount that will be swept in
231+
// the hyperloop for the given hyperloop id and sweep address.
232+
func (m *Manager) fetchHyperLoopTotalSweepAmt(hyperloopID ID,
233+
sweepAddr btcutil.Address) (btcutil.Amount, error) {
234+
235+
m.Lock()
236+
defer m.Unlock()
237+
if hyperloops, ok := m.pendingHyperloops[hyperloopID]; ok {
238+
var totalAmt btcutil.Amount
239+
for _, hyperloop := range hyperloops {
240+
hl := hyperloop.GetVal()
241+
if hl.SweepAddr.String() == sweepAddr.String() {
242+
totalAmt += hl.Amt
243+
}
244+
}
245+
246+
return totalAmt, nil
247+
}
248+
249+
return 0, errors.New("hyperloop not found")
250+
}

0 commit comments

Comments
 (0)