Skip to content

Commit 1ad4b85

Browse files
committed
mailbox: minor adjustments to ClientConn
In this commit, a few ClientConn variable names are changed and a few other minor adjustments are made such as adding a cancel function. This is in preparation for an upcoming refactor.
1 parent ddd9420 commit 1ad4b85

File tree

1 file changed

+68
-57
lines changed

1 file changed

+68
-57
lines changed

mailbox/client_conn.go

Lines changed: 68 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -121,22 +121,22 @@ func (c ClientStatus) String() string {
121121
type ClientConn struct {
122122
*connKit
123123

124-
receiveSocket *websocket.Conn
125-
receiveStreamMu sync.Mutex
124+
receiveSocket *websocket.Conn
125+
receiveMu sync.Mutex
126126

127-
sendSocket *websocket.Conn
128-
sendStreamMu sync.Mutex
127+
sendSocket *websocket.Conn
128+
sendMu sync.Mutex
129129

130130
gbnConn *gbn.GoBackNConn
131131
gbnOptions []gbn.Option
132132

133-
closeOnce sync.Once
134-
135133
status ClientStatus
136134
onNewStatus func(status ClientStatus)
137135
statusMu sync.Mutex
138136

139-
quit chan struct{}
137+
quit chan struct{}
138+
cancel func()
139+
closeOnce sync.Once
140140
}
141141

142142
// NewClientConn creates a new client connection with the given receive and send
@@ -151,6 +151,7 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string,
151151
log.Debugf("New client conn, read_stream=%x, write_stream=%x",
152152
receiveSID[:], sendSID[:])
153153

154+
ctxc, cancel := context.WithCancel(ctx)
154155
c := &ClientConn{
155156
gbnOptions: []gbn.Option{
156157
gbn.WithTimeout(gbnTimeout),
@@ -162,17 +163,18 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string,
162163
status: ClientStatusNotConnected,
163164
onNewStatus: onNewStatus,
164165
quit: make(chan struct{}),
166+
cancel: cancel,
165167
}
166168
c.connKit = &connKit{
167-
ctx: ctx,
169+
ctx: ctxc,
168170
impl: c,
169171
receiveSID: receiveSID,
170172
sendSID: sendSID,
171173
serverAddr: serverHost,
172174
}
173175

174176
gbnConn, err := gbn.NewClientConn(
175-
ctx, gbnN, c.sendToStream, c.recvFromStream, c.gbnOptions...,
177+
ctxc, gbnN, c.send, c.recv, c.gbnOptions...,
176178
)
177179
if err != nil {
178180
return nil, err
@@ -188,25 +190,25 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string,
188190
func RefreshClientConn(ctx context.Context, c *ClientConn) (*ClientConn,
189191
error) {
190192

191-
c.sendStreamMu.Lock()
192-
defer c.sendStreamMu.Unlock()
193+
c.sendMu.Lock()
194+
defer c.sendMu.Unlock()
193195

194-
c.receiveStreamMu.Lock()
195-
defer c.receiveStreamMu.Unlock()
196+
c.receiveMu.Lock()
197+
defer c.receiveMu.Unlock()
198+
199+
c.statusMu.Lock()
200+
defer c.statusMu.Unlock()
196201

197202
log.Debugf("Refreshing client conn, read_stream=%x, write_stream=%x",
198203
c.receiveSID[:], c.sendSID[:])
199204

200205
cc := &ClientConn{
201-
receiveSocket: c.receiveSocket,
202-
sendSocket: c.sendSocket,
203-
status: ClientStatusNotConnected,
204-
onNewStatus: c.onNewStatus,
205-
gbnOptions: c.gbnOptions,
206-
quit: make(chan struct{}),
206+
status: ClientStatusNotConnected,
207+
onNewStatus: c.onNewStatus,
208+
gbnOptions: c.gbnOptions,
209+
cancel: c.cancel,
210+
quit: make(chan struct{}),
207211
}
208-
c.sendSocket = nil
209-
c.receiveSocket = nil
210212

211213
cc.connKit = &connKit{
212214
ctx: ctx,
@@ -217,7 +219,7 @@ func RefreshClientConn(ctx context.Context, c *ClientConn) (*ClientConn,
217219
}
218220

219221
gbnConn, err := gbn.NewClientConn(
220-
ctx, gbnN, cc.sendToStream, cc.recvFromStream, cc.gbnOptions...,
222+
ctx, gbnN, cc.send, cc.recv, cc.gbnOptions...,
221223
)
222224
if err != nil {
223225
return nil, err
@@ -267,16 +269,16 @@ func statusFromError(err error) ClientStatus {
267269
}
268270
}
269271

270-
// recvFromStream is used to receive a payload from the receive socket.
271-
// The function is passed to and used by the gbn connection.
272-
// It therefore takes in and reacts on the cancellation of a context so that
273-
// the gbn connection is able to close independently of the ClientConn.
274-
func (c *ClientConn) recvFromStream(ctx context.Context) ([]byte, error) {
275-
c.receiveStreamMu.Lock()
272+
// recv is used to receive a payload from the receive socket. The function is
273+
// passed to and used by the gbn connection. It therefore takes in and reacts
274+
// on the cancellation of a context so that the gbn connection is able to close
275+
// independently of the ClientConn.
276+
func (c *ClientConn) recv(ctx context.Context) ([]byte, error) {
277+
c.receiveMu.Lock()
276278
if c.receiveSocket == nil {
277279
c.createReceiveMailBox(ctx, 0)
278280
}
279-
c.receiveStreamMu.Unlock()
281+
c.receiveMu.Unlock()
280282

281283
for {
282284
select {
@@ -287,15 +289,15 @@ func (c *ClientConn) recvFromStream(ctx context.Context) ([]byte, error) {
287289
default:
288290
}
289291

290-
c.receiveStreamMu.Lock()
292+
c.receiveMu.Lock()
291293
_, msg, err := c.receiveSocket.Read(ctx)
292294
if err != nil {
293295
log.Debugf("Client: got failure on receive socket, "+
294296
"re-trying: %v", err)
295297

296298
c.setStatus(ClientStatusNotConnected)
297299
c.createReceiveMailBox(ctx, retryWait)
298-
c.receiveStreamMu.Unlock()
300+
c.receiveMu.Unlock()
299301
continue
300302
}
301303
unwrapped, err := stripJSONWrapper(string(msg))
@@ -305,10 +307,10 @@ func (c *ClientConn) recvFromStream(ctx context.Context) ([]byte, error) {
305307

306308
c.setStatus(statusFromError(err))
307309
c.createReceiveMailBox(ctx, retryWait)
308-
c.receiveStreamMu.Unlock()
310+
c.receiveMu.Unlock()
309311
continue
310312
}
311-
c.receiveStreamMu.Unlock()
313+
c.receiveMu.Unlock()
312314

313315
mailboxMsg := &hashmailrpc.CipherBox{}
314316
err = defaultMarshaler.Unmarshal([]byte(unwrapped), mailboxMsg)
@@ -322,17 +324,17 @@ func (c *ClientConn) recvFromStream(ctx context.Context) ([]byte, error) {
322324
}
323325
}
324326

325-
// sendToStream is used to send a payload on the send socket. The function
326-
// is passed to and used by the gbn connection. It therefore takes in and
327-
// reacts on the cancellation of a context so that the gbn connection is able to
328-
// close independently of the ClientConn.
329-
func (c *ClientConn) sendToStream(ctx context.Context, payload []byte) error {
330-
// Set up the send socket if it has not yet been initialized.
331-
c.sendStreamMu.Lock()
327+
// send is used to send a payload on the send socket. The function is passed to
328+
// and used by the gbn connection. It therefore takes in and reacts on the
329+
// cancellation of a context so that the gbn connection is able to close
330+
// independently of the ClientConn.
331+
func (c *ClientConn) send(ctx context.Context, payload []byte) error {
332+
// Set up the send-socket if it has not yet been initialized.
333+
c.sendMu.Lock()
332334
if c.sendSocket == nil {
333335
c.createSendMailBox(ctx, 0)
334336
}
335-
c.sendStreamMu.Unlock()
337+
c.sendMu.Unlock()
336338

337339
// Retry sending the payload to the hashmail server until it succeeds.
338340
for {
@@ -356,7 +358,7 @@ func (c *ClientConn) sendToStream(ctx context.Context, payload []byte) error {
356358
return err
357359
}
358360

359-
c.sendStreamMu.Lock()
361+
c.sendMu.Lock()
360362
ctxt, cancel := context.WithTimeout(ctx, sendSocketTimeout)
361363
err = c.sendSocket.Write(
362364
ctxt, websocket.MessageText, sendInitBytes,
@@ -368,10 +370,10 @@ func (c *ClientConn) sendToStream(ctx context.Context, payload []byte) error {
368370

369371
c.setStatus(statusFromError(err))
370372
c.createSendMailBox(ctx, retryWait)
371-
c.sendStreamMu.Unlock()
373+
c.sendMu.Unlock()
372374
continue
373375
}
374-
c.sendStreamMu.Unlock()
376+
c.sendMu.Unlock()
375377

376378
return nil
377379
}
@@ -516,37 +518,46 @@ func (c *ClientConn) Close() error {
516518
c.closeOnce.Do(func() {
517519
log.Debugf("Closing client connection")
518520

519-
if err := c.gbnConn.Close(); err != nil {
520-
log.Debugf("Error closing gbn connection: %v", err)
521+
if c.gbnConn != nil {
522+
if err := c.gbnConn.Close(); err != nil {
523+
log.Debugf("Error closing gbn connection: %v",
524+
err)
525+
526+
returnErr = err
527+
}
521528
}
522529

523-
c.receiveStreamMu.Lock()
530+
c.receiveMu.Lock()
524531
if c.receiveSocket != nil {
525532
log.Debugf("sending bye on receive socket")
526-
returnErr = c.receiveSocket.Close(
533+
err := c.receiveSocket.Close(
527534
websocket.StatusNormalClosure, "bye",
528535
)
529-
if returnErr != nil {
536+
if err != nil {
530537
log.Errorf("Error closing receive socket: %v",
531-
returnErr)
538+
err)
539+
540+
returnErr = err
532541
}
533542
}
534-
c.receiveStreamMu.Unlock()
543+
c.receiveMu.Unlock()
535544

536-
c.sendStreamMu.Lock()
545+
c.sendMu.Lock()
537546
if c.sendSocket != nil {
538547
log.Debugf("sending bye on send socket")
539-
returnErr = c.sendSocket.Close(
548+
err := c.sendSocket.Close(
540549
websocket.StatusNormalClosure, "bye",
541550
)
542-
if returnErr != nil {
543-
log.Errorf("Error closing send socket: %v",
544-
returnErr)
551+
if err != nil {
552+
log.Errorf("Error closing send socket: %v", err)
553+
554+
returnErr = err
545555
}
546556
}
547-
c.sendStreamMu.Unlock()
557+
c.sendMu.Unlock()
548558

549559
close(c.quit)
560+
c.cancel()
550561
})
551562

552563
return returnErr

0 commit comments

Comments
 (0)