Skip to content

Commit a433bfd

Browse files
committed
mailbox: ClientConnTransport interface
In this commit, a new ClientConnTransport interface is added and a websockets implementation of it is added. Various adjustments are made to the ClientConn so that it uses the new interface instead of directly using websockets.
1 parent 1ad4b85 commit a433bfd

File tree

2 files changed

+281
-129
lines changed

2 files changed

+281
-129
lines changed

mailbox/client_conn.go

Lines changed: 50 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ import (
1010

1111
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
1212
"github.com/lightninglabs/lightning-node-connect/gbn"
13-
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
1413
"google.golang.org/protobuf/encoding/protojson"
15-
"nhooyr.io/websocket"
1614
)
1715

1816
var (
@@ -74,16 +72,6 @@ const (
7472
// gbnPongTimout is the time after sending the pong message that we will
7573
// timeout if we do not receive any message from our peer.
7674
gbnPongTimeout = 3 * time.Second
77-
78-
// webSocketRecvLimit is used to set the websocket receive limit. The
79-
// default value of 32KB is enough due to the fact that grpc has a
80-
// default packet maximum of 32KB which we then further wrap in gbn and
81-
// hashmail messages.
82-
webSocketRecvLimit int64 = 100 * 1024 // 100KB
83-
84-
// sendSocketTimeout is the timeout used for context cancellation on the
85-
// send socket.
86-
sendSocketTimeout = 1000 * time.Millisecond
8775
)
8876

8977
// ClientStatus is a description of the connection status of the client.
@@ -121,11 +109,9 @@ func (c ClientStatus) String() string {
121109
type ClientConn struct {
122110
*connKit
123111

124-
receiveSocket *websocket.Conn
125-
receiveMu sync.Mutex
126-
127-
sendSocket *websocket.Conn
128-
sendMu sync.Mutex
112+
transport ClientConnTransport
113+
receiveMu sync.Mutex
114+
sendMu sync.Mutex
129115

130116
gbnConn *gbn.GoBackNConn
131117
gbnOptions []gbn.Option
@@ -147,12 +133,18 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string,
147133

148134
receiveSID := GetSID(sid, true)
149135
sendSID := GetSID(sid, false)
136+
mailBoxInfo := &mailboxInfo{
137+
addr: serverHost,
138+
recvSID: receiveSID[:],
139+
sendSID: sendSID[:],
140+
}
150141

151142
log.Debugf("New client conn, read_stream=%x, write_stream=%x",
152143
receiveSID[:], sendSID[:])
153144

154145
ctxc, cancel := context.WithCancel(ctx)
155146
c := &ClientConn{
147+
transport: newWebsocketTransport(mailBoxInfo),
156148
gbnOptions: []gbn.Option{
157149
gbn.WithTimeout(gbnTimeout),
158150
gbn.WithHandshakeTimeout(gbnHandshakeTimeout),
@@ -203,6 +195,7 @@ func RefreshClientConn(ctx context.Context, c *ClientConn) (*ClientConn,
203195
c.receiveSID[:], c.sendSID[:])
204196

205197
cc := &ClientConn{
198+
transport: c.transport.Refresh(),
206199
status: ClientStatusNotConnected,
207200
onNewStatus: c.onNewStatus,
208201
gbnOptions: c.gbnOptions,
@@ -275,10 +268,11 @@ func statusFromError(err error) ClientStatus {
275268
// independently of the ClientConn.
276269
func (c *ClientConn) recv(ctx context.Context) ([]byte, error) {
277270
c.receiveMu.Lock()
278-
if c.receiveSocket == nil {
271+
defer c.receiveMu.Unlock()
272+
273+
if !c.transport.ReceiveConnected() {
279274
c.createReceiveMailBox(ctx, 0)
280275
}
281-
c.receiveMu.Unlock()
282276

283277
for {
284278
select {
@@ -289,38 +283,22 @@ func (c *ClientConn) recv(ctx context.Context) ([]byte, error) {
289283
default:
290284
}
291285

292-
c.receiveMu.Lock()
293-
_, msg, err := c.receiveSocket.Read(ctx)
286+
msg, retry, errStatus, err := c.transport.Recv(ctx)
294287
if err != nil {
295-
log.Debugf("Client: got failure on receive socket, "+
296-
"re-trying: %v", err)
288+
if !retry {
289+
return nil, err
290+
}
297291

298-
c.setStatus(ClientStatusNotConnected)
299-
c.createReceiveMailBox(ctx, retryWait)
300-
c.receiveMu.Unlock()
301-
continue
302-
}
303-
unwrapped, err := stripJSONWrapper(string(msg))
304-
if err != nil {
305-
log.Debugf("Client: got error message from receive "+
306-
"socket: %v", err)
292+
log.Debugf("Client: got failure on receive "+
293+
"socket/stream, re-trying: %v", err)
307294

308-
c.setStatus(statusFromError(err))
295+
c.setStatus(errStatus)
309296
c.createReceiveMailBox(ctx, retryWait)
310-
c.receiveMu.Unlock()
311297
continue
312298
}
313-
c.receiveMu.Unlock()
314-
315-
mailboxMsg := &hashmailrpc.CipherBox{}
316-
err = defaultMarshaler.Unmarshal([]byte(unwrapped), mailboxMsg)
317-
if err != nil {
318-
return nil, err
319-
}
320299

321300
c.setStatus(ClientStatusConnected)
322-
323-
return mailboxMsg.Msg, nil
301+
return msg, nil
324302
}
325303
}
326304

@@ -329,12 +307,13 @@ func (c *ClientConn) recv(ctx context.Context) ([]byte, error) {
329307
// cancellation of a context so that the gbn connection is able to close
330308
// independently of the ClientConn.
331309
func (c *ClientConn) send(ctx context.Context, payload []byte) error {
332-
// Set up the send-socket if it has not yet been initialized.
333310
c.sendMu.Lock()
334-
if c.sendSocket == nil {
311+
defer c.sendMu.Unlock()
312+
313+
// Set up the send-socket if it has not yet been initialized.
314+
if !c.transport.SendConnected() {
335315
c.createSendMailBox(ctx, 0)
336316
}
337-
c.sendMu.Unlock()
338317

339318
// Retry sending the payload to the hashmail server until it succeeds.
340319
for {
@@ -346,34 +325,21 @@ func (c *ClientConn) send(ctx context.Context, payload []byte) error {
346325
default:
347326
}
348327

349-
sendInit := &hashmailrpc.CipherBox{
350-
Desc: &hashmailrpc.CipherBoxDesc{
351-
StreamId: c.sendSID[:],
352-
},
353-
Msg: payload,
354-
}
355-
356-
sendInitBytes, err := defaultMarshaler.Marshal(sendInit)
357-
if err != nil {
358-
return err
359-
}
360-
361-
c.sendMu.Lock()
362-
ctxt, cancel := context.WithTimeout(ctx, sendSocketTimeout)
363-
err = c.sendSocket.Write(
364-
ctxt, websocket.MessageText, sendInitBytes,
328+
retry, errStatus, err := c.transport.Send(
329+
ctx, c.sendSID[:], payload,
365330
)
366-
cancel()
367331
if err != nil {
368-
log.Debugf("Client: got failure on send socket, "+
369-
"re-trying: %v", err)
332+
if !retry {
333+
return err
334+
}
335+
336+
log.Debugf("Client: got failure on send "+
337+
"socket/stream, re-trying: %v", err)
370338

371-
c.setStatus(statusFromError(err))
339+
c.setStatus(errStatus)
372340
c.createSendMailBox(ctx, retryWait)
373-
c.sendMu.Unlock()
374341
continue
375342
}
376-
c.sendMu.Unlock()
377343

378344
return nil
379345
}
@@ -400,38 +366,9 @@ func (c *ClientConn) createReceiveMailBox(ctx context.Context,
400366

401367
waiter.Wait()
402368

403-
receiveAddr := fmt.Sprintf(
404-
addrFormat, c.serverAddr, receivePath,
405-
)
406-
receiveSocket, _, err := websocket.Dial(ctx, receiveAddr, nil)
407-
if err != nil {
408-
log.Debugf("Client: error creating receive socket %v",
409-
err)
410-
411-
continue
412-
}
413-
receiveSocket.SetReadLimit(webSocketRecvLimit)
414-
c.receiveSocket = receiveSocket
415-
416-
receiveInit := &hashmailrpc.CipherBoxDesc{
417-
StreamId: c.receiveSID[:],
418-
}
419-
receiveInitBytes, err := defaultMarshaler.Marshal(receiveInit)
420-
if err != nil {
421-
log.Debugf("Client: error marshaling receive init "+
422-
"bytes %w", err)
423-
424-
continue
425-
}
426-
427-
ctxt, cancel := context.WithTimeout(ctx, sendSocketTimeout)
428-
err = c.receiveSocket.Write(
429-
ctxt, websocket.MessageText, receiveInitBytes,
430-
)
431-
cancel()
432-
if err != nil {
433-
log.Debugf("Client: error creating receive stream "+
434-
"%v", err)
369+
if err := c.transport.ConnectReceive(ctx); err != nil {
370+
log.Errorf("Client: error connecting to receive " +
371+
"socket/stream: %v")
435372

436373
continue
437374
}
@@ -459,17 +396,14 @@ func (c *ClientConn) createSendMailBox(ctx context.Context,
459396

460397
waiter.Wait()
461398

462-
log.Debugf("Client: Attempting to create send socket")
463-
sendAddr := fmt.Sprintf(addrFormat, c.serverAddr, sendPath)
464-
sendSocket, _, err := websocket.Dial(ctx, sendAddr, nil)
465-
if err != nil {
466-
log.Debugf("Client: error creating send socket %v", err)
399+
log.Debugf("Client: Attempting to create send socket/stream")
400+
if err := c.transport.ConnectSend(ctx); err != nil {
401+
log.Debugf("Client: error connecting to send "+
402+
"stream/socket %v", err)
467403
continue
468404
}
469405

470-
c.sendSocket = sendSocket
471-
472-
log.Debugf("Client: Send socket created")
406+
log.Debugf("Client: Connected to send socket/stream")
473407
return
474408
}
475409
}
@@ -528,31 +462,18 @@ func (c *ClientConn) Close() error {
528462
}
529463

530464
c.receiveMu.Lock()
531-
if c.receiveSocket != nil {
532-
log.Debugf("sending bye on receive socket")
533-
err := c.receiveSocket.Close(
534-
websocket.StatusNormalClosure, "bye",
535-
)
536-
if err != nil {
537-
log.Errorf("Error closing receive socket: %v",
538-
err)
539-
540-
returnErr = err
541-
}
465+
log.Debugf("closing receive stream/socket")
466+
if err := c.transport.CloseReceive(); err != nil {
467+
log.Errorf("Error closing receive stream/socket: %v", err)
468+
returnErr = err
542469
}
543470
c.receiveMu.Unlock()
544471

545472
c.sendMu.Lock()
546-
if c.sendSocket != nil {
547-
log.Debugf("sending bye on send socket")
548-
err := c.sendSocket.Close(
549-
websocket.StatusNormalClosure, "bye",
550-
)
551-
if err != nil {
552-
log.Errorf("Error closing send socket: %v", err)
553-
554-
returnErr = err
555-
}
473+
log.Debugf("closing send stream/socket")
474+
if err := c.transport.CloseSend(); err != nil {
475+
log.Errorf("Error closing send stream/socket: %v", err)
476+
returnErr = err
556477
}
557478
c.sendMu.Unlock()
558479

0 commit comments

Comments
 (0)