Skip to content

Commit 0562016

Browse files
committed
multi: grpc ClientConnTransport impl
In this commit, a new grpc implementation of the ClientConnTransport interface is added. Various changes are made to Client and ClientConn so that a caller can choose which implementation of ClientConnTransport should be used.
1 parent a433bfd commit 0562016

File tree

5 files changed

+196
-13
lines changed

5 files changed

+196
-13
lines changed

cmd/wasm-client/lnd_conn.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ func mailboxRPCConnection(mailboxServer, pairingPhrase string,
3030
)
3131

3232
ctx := context.Background()
33-
transportConn, err := mailbox.NewClient(ctx, connData)
33+
transportConn, err := mailbox.NewWebsocketsClient(
34+
ctx, mailboxServer, connData,
35+
)
3436
if err != nil {
3537
return nil, nil, err
3638
}

itest/client_harness.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ func (c *clientHarness) start() error {
6262
},
6363
)
6464

65-
transportConn, err := mailbox.NewClient(ctx, connData)
65+
transportConn, err := mailbox.NewWebsocketsClient(
66+
ctx, c.serverAddr, connData,
67+
)
6668
if err != nil {
6769
return err
6870
}

mailbox/client.go

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,23 @@ package mailbox
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"net"
78
"sync"
9+
10+
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
11+
"google.golang.org/grpc"
812
)
913

1014
// Client manages the mailboxConn it holds and refreshes it on connection
1115
// retries.
1216
type Client struct {
17+
serverHost string
18+
connData *ConnData
19+
1320
mailboxConn *ClientConn
1421

15-
connData *ConnData
22+
grpcClient hashmailrpc.HashMailClient
1623

1724
status ClientStatus
1825
statusMu sync.Mutex
@@ -22,27 +29,56 @@ type Client struct {
2229
ctx context.Context
2330
}
2431

25-
// NewClient creates a new Client object which will handle the mailbox
26-
// connection.
27-
func NewClient(ctx context.Context, connData *ConnData) (*Client, error) {
32+
// NewGrpcClient creates a new Client object which will handle the mailbox
33+
// connection and will use grpc streams to connect to the mailbox.
34+
func NewGrpcClient(ctx context.Context, serverHost string, connData *ConnData,
35+
dialOpts ...grpc.DialOption) (*Client, error) {
36+
37+
mailboxGrpcConn, err := grpc.Dial(serverHost, dialOpts...)
38+
if err != nil {
39+
return nil, fmt.Errorf("unable to connect to RPC server: %v",
40+
err)
41+
}
42+
43+
sid, err := connData.SID()
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
return &Client{
49+
ctx: ctx,
50+
serverHost: serverHost,
51+
connData: connData,
52+
grpcClient: hashmailrpc.NewHashMailClient(mailboxGrpcConn),
53+
status: ClientStatusNotConnected,
54+
sid: sid,
55+
}, nil
56+
}
57+
58+
// NewWebsocketsClient creates a new Client object which will handle the mailbox
59+
// connection and will use websockets to connect to the mailbox.
60+
func NewWebsocketsClient(ctx context.Context, serverHost string,
61+
connData *ConnData) (*Client, error) {
62+
2863
sid, err := connData.SID()
2964
if err != nil {
3065
return nil, err
3166
}
3267

3368
return &Client{
34-
ctx: ctx,
35-
connData: connData,
36-
status: ClientStatusNotConnected,
37-
sid: sid,
69+
ctx: ctx,
70+
serverHost: serverHost,
71+
connData: connData,
72+
status: ClientStatusNotConnected,
73+
sid: sid,
3874
}, nil
3975
}
4076

4177
// Dial returns a net.Conn abstraction over the mailbox connection. Dial is
4278
// called everytime grpc retries the connection. If this is the first
4379
// connection, a new ClientConn will be created. Otherwise, the existing
4480
// connection will just be refreshed.
45-
func (c *Client) Dial(_ context.Context, serverHost string) (net.Conn, error) {
81+
func (c *Client) Dial(_ context.Context, _ string) (net.Conn, error) {
4682
// If there is currently an active connection, block here until the
4783
// previous connection as been closed.
4884
if c.mailboxConn != nil {
@@ -73,7 +109,8 @@ func (c *Client) Dial(_ context.Context, serverHost string) (net.Conn, error) {
73109

74110
if c.mailboxConn == nil {
75111
mailboxConn, err := NewClientConn(
76-
c.ctx, c.sid, serverHost, func(status ClientStatus) {
112+
c.ctx, c.sid, c.serverHost, c.grpcClient,
113+
func(status ClientStatus) {
77114
c.statusMu.Lock()
78115
c.status = status
79116
c.statusMu.Unlock()

mailbox/client_conn.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
1212
"github.com/lightninglabs/lightning-node-connect/gbn"
13+
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
1314
"google.golang.org/protobuf/encoding/protojson"
1415
)
1516

@@ -129,6 +130,7 @@ type ClientConn struct {
129130
// session identifiers. The context given as the first parameter will be used
130131
// throughout the connection lifetime.
131132
func NewClientConn(ctx context.Context, sid [64]byte, serverHost string,
133+
client hashmailrpc.HashMailClient,
132134
onNewStatus func(status ClientStatus)) (*ClientConn, error) {
133135

134136
receiveSID := GetSID(sid, true)
@@ -143,8 +145,16 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string,
143145
receiveSID[:], sendSID[:])
144146

145147
ctxc, cancel := context.WithCancel(ctx)
148+
149+
var transport ClientConnTransport
150+
if client != nil {
151+
transport = newGrpcTransport(mailBoxInfo, client)
152+
} else {
153+
transport = newWebsocketTransport(mailBoxInfo)
154+
}
155+
146156
c := &ClientConn{
147-
transport: newWebsocketTransport(mailBoxInfo),
157+
transport: transport,
148158
gbnOptions: []gbn.Option{
149159
gbn.WithTimeout(gbnTimeout),
150160
gbn.WithHandshakeTimeout(gbnHandshakeTimeout),

mailbox/client_transport.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,3 +229,135 @@ func (wt *websocketTransport) CloseSend() error {
229229

230230
return wt.sendSocket.Close(websocket.StatusNormalClosure, "bye")
231231
}
232+
233+
// grpcTransport is an implementation of ClientConnTransport that uses grpc
234+
// streams to connect the the mailbox.
235+
type grpcTransport struct {
236+
*mailboxInfo
237+
238+
client hashmailrpc.HashMailClient
239+
receiveStream hashmailrpc.HashMail_RecvStreamClient
240+
sendStream hashmailrpc.HashMail_SendStreamClient
241+
}
242+
243+
// newGrpcTransport constructs a new grpcTransport instance.
244+
func newGrpcTransport(mbInfo *mailboxInfo,
245+
client hashmailrpc.HashMailClient) *grpcTransport {
246+
247+
return &grpcTransport{
248+
client: client,
249+
mailboxInfo: mbInfo,
250+
}
251+
}
252+
253+
// Refresh creates a new ClientConnTransport with no initialised send or receive
254+
// connections.
255+
//
256+
// NOTE: this is part of the ClientConnTransport interface.
257+
func (gt *grpcTransport) Refresh() ClientConnTransport {
258+
return &grpcTransport{
259+
client: gt.client,
260+
mailboxInfo: gt.mailboxInfo,
261+
}
262+
}
263+
264+
// ReceiveConnected returns true if the transport is connected to the
265+
// hashmail-server receive stream.
266+
//
267+
// NOTE: this is part of the ClientConnTransport interface.
268+
func (gt *grpcTransport) ReceiveConnected() bool {
269+
return gt.receiveStream != nil
270+
}
271+
272+
// SendConnected returns true if the transport is connected to the
273+
// hashmail-server send stream.
274+
//
275+
// NOTE: this is part of the ClientConnTransport interface.
276+
func (gt *grpcTransport) SendConnected() bool {
277+
return gt.sendStream != nil
278+
}
279+
280+
// ConnectSend can be called in order to initialise the send-stream with
281+
// the hashmail-server.
282+
//
283+
// NOTE: this is part of the ClientConnTransport interface.
284+
func (gt *grpcTransport) ConnectSend(ctx context.Context) error {
285+
sendStream, err := gt.client.SendStream(ctx)
286+
if err != nil {
287+
return err
288+
}
289+
290+
gt.sendStream = sendStream
291+
return nil
292+
}
293+
294+
// ConnectReceive can be called in order to initialise the receive-stream with
295+
// the hashmail-server.
296+
//
297+
// NOTE: this is part of the ClientConnTransport interface.
298+
func (gt *grpcTransport) ConnectReceive(ctx context.Context) error {
299+
receiveInit := &hashmailrpc.CipherBoxDesc{StreamId: gt.recvSID}
300+
readStream, err := gt.client.RecvStream(ctx, receiveInit)
301+
if err != nil {
302+
return err
303+
}
304+
305+
gt.receiveStream = readStream
306+
return nil
307+
}
308+
309+
// Recv will attempt to read data off of the underlying transport's
310+
// receive stream.
311+
//
312+
// NOTE: this is part of the ClientConnTransport interface.
313+
func (gt *grpcTransport) Recv(_ context.Context) ([]byte, bool, ClientStatus,
314+
error) {
315+
316+
controlMsg, err := gt.receiveStream.Recv()
317+
if err != nil {
318+
return nil, true, statusFromError(err), err
319+
}
320+
321+
return controlMsg.Msg, false, ClientStatusConnected, nil
322+
}
323+
324+
// Send will attempt to send data on the underlying transport's send-stream.
325+
//
326+
// NOTE: this is part of the ClientConnTransport interface.
327+
func (gt *grpcTransport) Send(_ context.Context, streamID, payload []byte) (
328+
bool, ClientStatus, error) {
329+
330+
err := gt.sendStream.Send(&hashmailrpc.CipherBox{
331+
Desc: &hashmailrpc.CipherBoxDesc{
332+
StreamId: streamID,
333+
},
334+
Msg: payload,
335+
})
336+
if err != nil {
337+
return true, ClientStatusNotConnected, err
338+
}
339+
340+
return false, ClientStatusConnected, nil
341+
}
342+
343+
// CloseReceive will close the transport's connection to the receive-stream.
344+
//
345+
// NOTE: this is part of the ClientConnTransport interface.
346+
func (gt *grpcTransport) CloseReceive() error {
347+
if gt.receiveStream == nil {
348+
return nil
349+
}
350+
351+
return gt.receiveStream.CloseSend()
352+
}
353+
354+
// CloseSend will close the transport's connection to the send-stream.
355+
//
356+
// NOTE: this is part of the ClientConnTransport interface.
357+
func (gt *grpcTransport) CloseSend() error {
358+
if gt.sendStream == nil {
359+
return nil
360+
}
361+
362+
return gt.sendStream.CloseSend()
363+
}

0 commit comments

Comments
 (0)