Skip to content

Commit 55dc5ad

Browse files
committed
api: connect and subscribe to streaming endpoints
1 parent cac4d93 commit 55dc5ad

File tree

8 files changed

+161
-8
lines changed

8 files changed

+161
-8
lines changed

app/src/api/lnd.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import GrpcClient from './grpc';
77
interface LndEvents {
88
transaction: LND.Transaction.AsObject;
99
channel: LND.ChannelEventUpdate.AsObject;
10-
invoice: LND.Invoice.AsObject;
1110
}
1211

1312
/**
@@ -56,6 +55,24 @@ class LndApi extends BaseApi<LndEvents> {
5655
const res = await this._grpc.request(Lightning.ListChannels, req, this._meta);
5756
return res.toObject();
5857
}
58+
59+
/**
60+
* Connect to the LND streaming endpoints
61+
*/
62+
connectStreams() {
63+
this._grpc.subscribe(
64+
Lightning.SubscribeTransactions,
65+
new LND.GetTransactionsRequest(),
66+
transaction => this.emit('transaction', transaction.toObject()),
67+
this._meta,
68+
);
69+
this._grpc.subscribe(
70+
Lightning.SubscribeChannelEvents,
71+
new LND.ChannelEventSubscription(),
72+
channelEvent => this.emit('channel', channelEvent.toObject()),
73+
this._meta,
74+
);
75+
}
5976
}
6077

6178
export default LndApi;

app/src/api/loop.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,17 @@ class LoopApi extends BaseApi<LoopEvents> {
108108
return res.toObject();
109109
}
110110

111+
/**
112+
* Connect to the Loop streaming endpoint
113+
*/
114+
connectStreams() {
115+
this._grpc.subscribe(
116+
SwapClient.Monitor,
117+
new LOOP.MonitorRequest(),
118+
swapStatus => this.emit('monitor', swapStatus.toObject()),
119+
this._meta,
120+
);
121+
}
111122
/**
112123
* Calculates the max routing fee params for loop out. this mimics the loop cli
113124
* behavior of using 2% of the amount

app/src/store/models/channel.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export default class Channel {
1313

1414
@observable chanId = '';
1515
@observable remotePubkey = '';
16+
@observable channelPoint = '';
1617
@observable capacity = Big(0);
1718
@observable localBalance = Big(0);
1819
@observable remoteBalance = Big(0);
@@ -84,6 +85,7 @@ export default class Channel {
8485
update(lndChannel: LND.Channel.AsObject) {
8586
this.chanId = lndChannel.chanId;
8687
this.remotePubkey = lndChannel.remotePubkey;
88+
this.channelPoint = lndChannel.channelPoint;
8789
this.capacity = Big(lndChannel.capacity);
8890
this.localBalance = Big(lndChannel.localBalance);
8991
this.remoteBalance = Big(lndChannel.remoteBalance);

app/src/store/store.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ export class Store {
4848
// a flag to indicate when the store has completed all of its
4949
// API requests requested during initialization
5050
@observable initialized = false;
51+
// a flag to indicate when the websocket streams are connected
52+
@observable streamsConnected = false;
5153

5254
constructor(
5355
lnd: LndApi,
@@ -83,9 +85,14 @@ export class Store {
8385
this.uiStore.goToLoop();
8486
// also fetch all the data we need
8587
this.fetchAllData();
88+
// connect and subscribe to the server-side streams
89+
this.connectToStreams();
90+
this.subscribeToStreams();
8691
} else {
8792
// go to auth page if we are not authenticated
8893
this.uiStore.gotoAuth();
94+
// unsubscribe from streams since we are no longer authenticated
95+
this.unsubscribeFromStreams();
8996
}
9097
},
9198
{ name: 'authenticatedAutorun' },
@@ -102,6 +109,39 @@ export class Store {
102109
await this.swapStore.fetchSwaps();
103110
await this.nodeStore.fetchBalances();
104111
}
112+
113+
/** connects to the LND and Loop websocket streams if not already connected */
114+
@action.bound
115+
connectToStreams() {
116+
if (this.streamsConnected) return;
117+
118+
const { lnd, loop } = this.api;
119+
lnd.connectStreams();
120+
loop.connectStreams();
121+
this.streamsConnected = true;
122+
}
123+
124+
/**
125+
* subscribes to the LND and Loop streaming endpoints
126+
*/
127+
@action.bound
128+
subscribeToStreams() {
129+
const { lnd, loop } = this.api;
130+
lnd.on('transaction', this.nodeStore.onTransaction);
131+
lnd.on('channel', this.channelStore.onChannelEvent);
132+
loop.on('monitor', this.swapStore.onSwapUpdate);
133+
}
134+
135+
/**
136+
* unsubscribes from the LND and Loop streaming endpoints
137+
*/
138+
@action.bound
139+
unsubscribeFromStreams() {
140+
const { lnd, loop } = this.api;
141+
lnd.off('transaction', this.nodeStore.onTransaction);
142+
lnd.off('channel', this.channelStore.onChannelEvent);
143+
loop.off('monitor', this.swapStore.onSwapUpdate);
144+
}
105145
}
106146

107147
/**

app/src/store/stores/buildSwapStore.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,6 @@ class BuildSwapStore {
361361
// hide the swap UI after it is complete
362362
this.cancel();
363363
this._store.uiStore.toggleProcessingSwaps();
364-
this._store.swapStore.fetchSwaps();
365364
});
366365
} catch (error) {
367366
this._store.uiStore.handleError(error, `Unable to Perform ${direction}`);

app/src/store/stores/channelStore.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,18 @@ import {
77
toJS,
88
values,
99
} from 'mobx';
10+
import { ChannelEventUpdate, ChannelPoint } from 'types/generated/lnd_pb';
1011
import Big from 'big.js';
1112
import { Store } from 'store';
1213
import { Channel } from '../models';
1314

15+
const {
16+
OPEN_CHANNEL,
17+
CLOSED_CHANNEL,
18+
ACTIVE_CHANNEL,
19+
INACTIVE_CHANNEL,
20+
} = ChannelEventUpdate.UpdateType;
21+
1422
export default class ChannelStore {
1523
private _store: Store;
1624

@@ -90,10 +98,52 @@ export default class ChannelStore {
9098
}
9199
}
92100

101+
/** update the channel list based on events from the API */
102+
@action.bound
103+
onChannelEvent(event: ChannelEventUpdate.AsObject) {
104+
this._store.log.info('handle incoming channel event', event);
105+
if (event.type === INACTIVE_CHANNEL && event.inactiveChannel) {
106+
// set the channel in state to inactive
107+
const point = this._channelPointToString(event.inactiveChannel);
108+
values(this.channels)
109+
.filter(c => c.channelPoint === point)
110+
.forEach(c => {
111+
c.active = false;
112+
this._store.log.info('updated channel', toJS(c));
113+
});
114+
} else if (event.type === ACTIVE_CHANNEL && event.activeChannel) {
115+
// set the channel in state to active
116+
const point = this._channelPointToString(event.activeChannel);
117+
values(this.channels)
118+
.filter(c => c.channelPoint === point)
119+
.forEach(c => {
120+
c.active = true;
121+
this._store.log.info('updated channel', toJS(c));
122+
});
123+
} else if (event.type === CLOSED_CHANNEL && event.closedChannel) {
124+
// delete the closed channel
125+
const channel = this.channels.get(event.closedChannel.chanId);
126+
this.channels.delete(event.closedChannel.chanId);
127+
this._store.log.info('removed closed channel', toJS(channel));
128+
} else if (event.type === OPEN_CHANNEL && event.openChannel) {
129+
// add the new opened channel
130+
const channel = new Channel(this._store, event.openChannel);
131+
this.channels.set(channel.chanId, channel);
132+
this._store.log.info('added new open channel', toJS(channel));
133+
}
134+
}
135+
93136
/** exports the sorted list of channels to CSV file */
94137
@action.bound
95138
exportChannels() {
96139
this._store.log.info('exporting Channels to a CSV file');
97140
this._store.csv.export('channels', Channel.csvColumns, toJS(this.sortedChannels));
98141
}
142+
143+
/** converts a base64 encoded channel point to a hex encoded channel point */
144+
private _channelPointToString(channelPoint: ChannelPoint.AsObject) {
145+
const txidBytes = channelPoint.fundingTxidBytes as string;
146+
const txid = Buffer.from(txidBytes, 'base64').reverse().toString('hex');
147+
return `${txid}:${channelPoint.outputIndex}`;
148+
}
99149
}

app/src/store/stores/nodeStore.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { action, observable, runInAction, toJS } from 'mobx';
2+
import { Transaction } from 'types/generated/lnd_pb';
23
import Big from 'big.js';
34
import { Store } from 'store';
45
import { Wallet } from '../models';
@@ -8,6 +9,11 @@ type NodeNetwork = 'mainnet' | 'testnet' | 'regtest';
89

910
export default class NodeStore {
1011
private _store: Store;
12+
/**
13+
* an internal list of txn ids used to prevent updating the balance
14+
* multiple times for the same transaction.
15+
*/
16+
private _knownTxns: string[] = [];
1117

1218
/** the pubkey of the LND node */
1319
@observable pubkey = '';
@@ -64,4 +70,16 @@ export default class NodeStore {
6470
this._store.uiStore.handleError(error, 'Unable to fetch balances');
6571
}
6672
}
73+
74+
/**
75+
* updates the wallet balance from the transaction provided
76+
*/
77+
@action.bound
78+
onTransaction(transaction: Transaction.AsObject) {
79+
this._store.log.info('handle incoming transaction', transaction);
80+
if (this._knownTxns.includes(transaction.txHash)) return;
81+
this._knownTxns.push(transaction.txHash);
82+
this.wallet.walletBalance = this.wallet.walletBalance.plus(transaction.amount);
83+
this._store.log.info('updated nodeStore.wallet', toJS(this.wallet));
84+
}
6785
}

app/src/store/stores/swapStore.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
toJS,
1010
values,
1111
} from 'mobx';
12+
import { SwapStatus } from 'types/generated/loop_pb';
1213
import { IS_PROD, IS_TEST } from 'config';
1314
import { Store } from 'store';
1415
import { Swap } from '../models';
@@ -89,12 +90,7 @@ export default class SwapStore {
8990
// update existing swaps or create new ones in state. using this
9091
// approach instead of overwriting the array will cause fewer state
9192
// mutations, resulting in better react rendering performance
92-
const existing = this.swaps.get(loopSwap.id);
93-
if (existing) {
94-
existing.update(loopSwap);
95-
} else {
96-
this.swaps.set(loopSwap.id, new Swap(loopSwap));
97-
}
93+
this.addOrUpdateSwap(loopSwap);
9894
});
9995
// remove any swaps in state that are not in the API response
10096
const serverIds = swapsList.map(c => c.id);
@@ -111,6 +107,26 @@ export default class SwapStore {
111107
}
112108
}
113109

110+
/** adds a new swap or updates an existing one */
111+
@action.bound
112+
addOrUpdateSwap(loopSwap: SwapStatus.AsObject) {
113+
const existing = this.swaps.get(loopSwap.id);
114+
if (existing) {
115+
existing.update(loopSwap);
116+
this._store.log.info('updated existing swap', toJS(loopSwap));
117+
} else {
118+
this.swaps.set(loopSwap.id, new Swap(loopSwap));
119+
this._store.log.info('added new swap', toJS(loopSwap));
120+
}
121+
}
122+
123+
/** updates the swap and refreshes the channel list */
124+
@action.bound
125+
onSwapUpdate(loopSwap: SwapStatus.AsObject) {
126+
this.addOrUpdateSwap(loopSwap);
127+
this._store.channelStore.fetchChannels();
128+
}
129+
114130
@action.bound
115131
startPolling() {
116132
if (this.pollingInterval) this.stopPolling();

0 commit comments

Comments
 (0)