Skip to content

Commit cac4d93

Browse files
committed
api: add websocket events to the lnd & loop apis
1 parent 05f2d96 commit cac4d93

File tree

8 files changed

+132
-39
lines changed

8 files changed

+132
-39
lines changed

app/src/__mocks__/@improbable-eng/grpc-web.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,6 @@ export const grpc = {
3131
});
3232
},
3333
),
34+
// mock client function to simulate server-side streaming
35+
client: jest.fn(),
3436
};

app/src/__stories__/StoryWrapper.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const grpc = {
2020
const response: any = { toObject: () => data };
2121
return Promise.resolve(response);
2222
},
23+
subscribe: () => undefined,
2324
};
2425

2526
// fake the AppStorage dependency so that settings aren't shared across stories

app/src/api/auth.ts

Lines changed: 0 additions & 26 deletions
This file was deleted.

app/src/api/base.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { Emitter, EventKey, EventMap, EventReceiver } from 'types/emitter';
2+
import { EventEmitter } from 'events';
3+
4+
/**
5+
* A shared base class containing logic for storing the API credentials
6+
*/
7+
class BaseApi<T extends EventMap> implements Emitter<T> {
8+
private _credentials = '';
9+
/** an internal event emitter used to track event subscriptions */
10+
private _emitter = new EventEmitter();
11+
12+
/**
13+
* Returns a metadata object containing authorization info that was
14+
* previous set if any
15+
*/
16+
protected get _meta() {
17+
return this._credentials
18+
? { authorization: `Basic ${this._credentials}` }
19+
: undefined;
20+
}
21+
22+
/**
23+
* Sets the credentials to use for all API requests
24+
* @param credentials the base64 encoded password
25+
*/
26+
setCredentials(credentials: string) {
27+
this._credentials = credentials;
28+
}
29+
30+
/**
31+
* Subscribe to have a handler function called when an event is emitted
32+
* @param eventName the name of the event
33+
* @param handler the function to call when the event is emitted
34+
*/
35+
on<K extends EventKey<T>>(eventName: K, handler: EventReceiver<T[K]>) {
36+
this._emitter.on(eventName, handler);
37+
}
38+
39+
/**
40+
* Unsubscribes the handler for the provided event
41+
* @param eventName the name of the event
42+
* @param handler the function that was used to subscribe to the event
43+
*/
44+
off<K extends EventKey<T>>(eventName: K, handler: EventReceiver<T[K]>) {
45+
this._emitter.off(eventName, handler);
46+
}
47+
48+
/**
49+
* Call all of the subscribed handlers for an event with the supplied argument
50+
* @param eventName the name of the event
51+
* @param params the argument to pass to the handlers that are subscribed
52+
* to this event
53+
*/
54+
emit<K extends EventKey<T>>(eventName: K, params: T[K]) {
55+
this._emitter.emit(eventName, params);
56+
}
57+
}
58+
59+
export default BaseApi;

app/src/api/grpc.ts

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import { grpc } from '@improbable-eng/grpc-web';
22
import { ProtobufMessage } from '@improbable-eng/grpc-web/dist/typings/message';
33
import { Metadata } from '@improbable-eng/grpc-web/dist/typings/metadata';
4-
import { UnaryMethodDefinition } from '@improbable-eng/grpc-web/dist/typings/service';
4+
import {
5+
MethodDefinition,
6+
UnaryMethodDefinition,
7+
} from '@improbable-eng/grpc-web/dist/typings/service';
58
import { DEV_HOST } from 'config';
69
import { AuthenticationError } from 'util/errors';
710
import { grpcLog as log } from 'util/log';
@@ -19,30 +22,62 @@ class GrpcClient {
1922
metadata?: Metadata.ConstructorArg,
2023
): Promise<TRes> {
2124
return new Promise((resolve, reject) => {
22-
log.debug(
23-
`Request: ${methodDescriptor.service.serviceName}.${methodDescriptor.methodName}`,
24-
);
25-
log.debug(` - req: `, request.toObject());
25+
const method = `${methodDescriptor.methodName}`;
26+
log.debug(`${method} request`, request.toObject());
2627
grpc.unary(methodDescriptor, {
2728
host: DEV_HOST,
2829
request,
2930
metadata,
3031
onEnd: ({ status, statusMessage, headers, message, trailers }) => {
31-
log.debug(' - status', status, statusMessage);
32-
log.debug(' - headers', headers);
32+
log.debug(`${method} status`, status, statusMessage);
33+
log.debug(`${method} headers`, headers);
3334
if (status === grpc.Code.OK && message) {
34-
log.debug(' - message', message.toObject());
35+
log.debug(`${method} message`, message.toObject());
3536
resolve(message as TRes);
3637
} else if (status === grpc.Code.Unauthenticated) {
3738
reject(new AuthenticationError(`${statusMessage}`));
3839
} else {
3940
reject(new Error(`${status}: ${statusMessage}`));
4041
}
41-
log.debug(' - trailers', trailers);
42+
log.debug(`${method} trailers`, trailers);
4243
},
4344
});
4445
});
4546
}
47+
48+
/**
49+
* Subscribes to a GRPC server-streaming endpoint and executes the `onMessage` handler
50+
* when a new message is received from the server
51+
* @param methodDescriptor the GRPC method to call on the service
52+
* @param request the GRPC request message to send
53+
* @param onMessage the callback function to execute when a new message is received
54+
* @param metadata headers to include with the request
55+
*/
56+
subscribe<TReq extends ProtobufMessage, TRes extends ProtobufMessage>(
57+
methodDescriptor: MethodDefinition<TReq, TRes>,
58+
request: TReq,
59+
onMessage: (res: TRes) => void,
60+
metadata?: Metadata.ConstructorArg,
61+
) {
62+
const method = `${methodDescriptor.methodName}`;
63+
const client = grpc.client(methodDescriptor, {
64+
host: DEV_HOST,
65+
transport: grpc.WebsocketTransport(),
66+
});
67+
client.onHeaders(headers => {
68+
log.debug(`${method} - headers`, headers);
69+
});
70+
client.onMessage(message => {
71+
log.debug(`${method} - message`, message.toObject());
72+
onMessage(message as TRes);
73+
});
74+
client.onEnd((status, statusMessage, trailers) => {
75+
log.debug(`${method} - status`, status, statusMessage);
76+
log.debug(`${method} - trailers`, trailers);
77+
});
78+
client.start(metadata);
79+
client.send(request);
80+
}
4681
}
4782

4883
export default GrpcClient;

app/src/api/lnd.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
import * as LND from 'types/generated/lnd_pb';
22
import { Lightning } from 'types/generated/lnd_pb_service';
3-
import AuthenticatedApi from './auth';
3+
import BaseApi from './base';
44
import GrpcClient from './grpc';
55

6+
/** the names and argument types for the subscription events */
7+
interface LndEvents {
8+
transaction: LND.Transaction.AsObject;
9+
channel: LND.ChannelEventUpdate.AsObject;
10+
invoice: LND.Invoice.AsObject;
11+
}
12+
613
/**
714
* An API wrapper to communicate with the LND node via GRPC
815
*/
9-
class LndApi extends AuthenticatedApi {
16+
class LndApi extends BaseApi<LndEvents> {
1017
private _grpc: GrpcClient;
1118

1219
constructor(grpc: GrpcClient) {

app/src/api/loop.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,18 @@ import * as LOOP from 'types/generated/loop_pb';
22
import { SwapClient } from 'types/generated/loop_pb_service';
33
import { Quote } from 'types/state';
44
import Big from 'big.js';
5-
import AuthenticatedApi from './auth';
5+
import BaseApi from './base';
66
import GrpcClient from './grpc';
77

8+
/** the names and argument types for the subscription events */
9+
interface LoopEvents {
10+
monitor: LOOP.SwapStatus.AsObject;
11+
}
12+
813
/**
914
* An API wrapper to communicate with the Loop daemon via GRPC
1015
*/
11-
class LoopApi extends AuthenticatedApi {
16+
class LoopApi extends BaseApi<LoopEvents> {
1217
private _grpc: GrpcClient;
1318

1419
constructor(grpc: GrpcClient) {

app/src/types/emitter.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
export type EventMap = Record<string, any>;
2+
export type EventKey<T extends EventMap> = string & keyof T;
3+
export type EventReceiver<T> = (params: T) => void;
4+
5+
/** Generic interface to represent a type-safe pubsub Emitter object */
6+
export interface Emitter<T extends EventMap> {
7+
on<K extends EventKey<T>>(eventName: K, handler: EventReceiver<T[K]>): void;
8+
off<K extends EventKey<T>>(eventName: K, handler: EventReceiver<T[K]>): void;
9+
emit<K extends EventKey<T>>(eventName: K, params: T[K]): void;
10+
}

0 commit comments

Comments
 (0)