@@ -28,6 +28,10 @@ const (
28
28
// then we'll allow it up to this burst allowance.
29
29
DefaultMsgBurstAllowance = 10
30
30
31
+ // DefaultStaleTimeout is the time after which a mailbox will be torn
32
+ // down if neither of its streams are occupied.
33
+ DefaultStaleTimeout = time .Hour
34
+
31
35
// DefaultBufSize is the default number of bytes that are read in a
32
36
// single operation.
33
37
DefaultBufSize = 4096
@@ -185,11 +189,14 @@ type stream struct {
185
189
wg sync.WaitGroup
186
190
187
191
limiter * rate.Limiter
192
+
193
+ status * streamStatus
188
194
}
189
195
190
196
// newStream creates a new stream independent of any given stream ID.
191
197
func newStream (id streamID , limiter * rate.Limiter ,
192
- equivAuth func (auth * hashmailrpc.CipherBoxAuth ) error ) * stream {
198
+ equivAuth func (auth * hashmailrpc.CipherBoxAuth ) error ,
199
+ onStale func () error , staleTimeout time.Duration ) * stream {
193
200
194
201
// Our stream is actually just a plain io.Pipe. This allows us to avoid
195
202
// having to do things like rate limiting, etc as we can limit the
@@ -204,6 +211,7 @@ func newStream(id streamID, limiter *rate.Limiter,
204
211
id : id ,
205
212
equivAuth : equivAuth ,
206
213
limiter : limiter ,
214
+ status : newStreamStatus (onStale , staleTimeout ),
207
215
readBytesChan : make (chan []byte ),
208
216
readErrChan : make (chan error , 1 ),
209
217
quit : make (chan struct {}),
@@ -213,6 +221,7 @@ func newStream(id streamID, limiter *rate.Limiter,
213
221
// will cause the goroutine below to get an EOF error when reading,
214
222
// which will cause it to close the other ends of the pipe.
215
223
s .tearDown = func () error {
224
+ s .status .stop ()
216
225
err := writeWritePipe .Close ()
217
226
if err != nil {
218
227
return err
@@ -284,12 +293,14 @@ func newStream(id streamID, limiter *rate.Limiter,
284
293
// ReturnReadStream returns the target read stream back to its holding channel.
285
294
func (s * stream ) ReturnReadStream (r * readStream ) {
286
295
s .readStreamChan <- r
296
+ s .status .streamReturned (true )
287
297
}
288
298
289
299
// ReturnWriteStream returns the target write stream back to its holding
290
300
// channel.
291
301
func (s * stream ) ReturnWriteStream (w * writeStream ) {
292
302
s .writeStreamChan <- w
303
+ s .status .streamReturned (false )
293
304
}
294
305
295
306
// RequestReadStream attempts to request the read stream from the main backing
@@ -300,6 +311,7 @@ func (s *stream) RequestReadStream() (*readStream, error) {
300
311
301
312
select {
302
313
case r := <- s .readStreamChan :
314
+ s .status .streamTaken (true )
303
315
return r , nil
304
316
default :
305
317
return nil , fmt .Errorf ("read stream occupied" )
@@ -314,6 +326,7 @@ func (s *stream) RequestWriteStream() (*writeStream, error) {
314
326
315
327
select {
316
328
case w := <- s .writeStreamChan :
329
+ s .status .streamTaken (false )
317
330
return w , nil
318
331
default :
319
332
return nil , fmt .Errorf ("write stream occupied" )
@@ -324,6 +337,7 @@ func (s *stream) RequestWriteStream() (*writeStream, error) {
324
337
type hashMailServerConfig struct {
325
338
msgRate time.Duration
326
339
msgBurstAllowance int
340
+ staleTimeout time.Duration
327
341
}
328
342
329
343
// hashMailServer is an implementation of the HashMailServer gRPC service that
@@ -350,6 +364,9 @@ func newHashMailServer(cfg hashMailServerConfig) *hashMailServer {
350
364
if cfg .msgBurstAllowance == 0 {
351
365
cfg .msgBurstAllowance = DefaultMsgBurstAllowance
352
366
}
367
+ if cfg .staleTimeout == 0 {
368
+ cfg .staleTimeout = DefaultStaleTimeout
369
+ }
353
370
354
371
return & hashMailServer {
355
372
streams : make (map [streamID ]* stream ),
@@ -372,6 +389,29 @@ func (h *hashMailServer) Stop() {
372
389
373
390
}
374
391
392
+ // tearDownStaleStream can be used to tear down a stale mailbox stream.
393
+ func (h * hashMailServer ) tearDownStaleStream (id streamID ) error {
394
+ log .Debugf ("Tearing down stale HashMail stream: id=%x" , id )
395
+
396
+ h .Lock ()
397
+ defer h .Unlock ()
398
+
399
+ stream , ok := h .streams [id ]
400
+ if ! ok {
401
+ return fmt .Errorf ("stream not found" )
402
+ }
403
+
404
+ if err := stream .tearDown (); err != nil {
405
+ return err
406
+ }
407
+
408
+ delete (h .streams , id )
409
+
410
+ mailboxCount .Set (float64 (len (h .streams )))
411
+
412
+ return nil
413
+ }
414
+
375
415
// ValidateStreamAuth attempts to validate the authentication mechanism that is
376
416
// being used to claim or revoke a stream within the mail server.
377
417
func (h * hashMailServer ) ValidateStreamAuth (ctx context.Context ,
@@ -415,7 +455,9 @@ func (h *hashMailServer) InitStream(
415
455
freshStream := newStream (
416
456
streamID , limiter , func (auth * hashmailrpc.CipherBoxAuth ) error {
417
457
return nil
418
- },
458
+ }, func () error {
459
+ return h .tearDownStaleStream (streamID )
460
+ }, h .cfg .staleTimeout ,
419
461
)
420
462
421
463
h .streams [streamID ] = freshStream
@@ -430,7 +472,6 @@ func (h *hashMailServer) InitStream(
430
472
// LookUpReadStream attempts to loop up a new stream. If the stream is found, then
431
473
// the stream is marked as being active. Otherwise, an error is returned.
432
474
func (h * hashMailServer ) LookUpReadStream (streamID []byte ) (* readStream , error ) {
433
-
434
475
h .RLock ()
435
476
defer h .RUnlock ()
436
477
@@ -710,3 +751,94 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
710
751
}
711
752
712
753
var _ hashmailrpc.HashMailServer = (* hashMailServer )(nil )
754
+
755
+ // streamStatus keeps track of the occupancy status of a stream's read and
756
+ // write sub-streams. It is initialised with callback functions to call on the
757
+ // event of the streams being occupied (either or both of the streams are
758
+ // occupied) or fully idle (both streams are unoccupied).
759
+ type streamStatus struct {
760
+ disabled bool
761
+
762
+ staleTimeout time.Duration
763
+ staleTimer * time.Timer
764
+
765
+ readStreamOccupied bool
766
+ writeStreamOccupied bool
767
+ sync.Mutex
768
+ }
769
+
770
+ // newStreamStatus constructs a new streamStatus instance.
771
+ func newStreamStatus (onStale func () error ,
772
+ staleTimeout time.Duration ) * streamStatus {
773
+
774
+ if staleTimeout < 0 {
775
+ return & streamStatus {
776
+ disabled : true ,
777
+ }
778
+ }
779
+
780
+ staleTimer := time .AfterFunc (staleTimeout , func () {
781
+ if err := onStale (); err != nil {
782
+ log .Errorf ("error in onStale callback: %v" , err )
783
+ }
784
+ })
785
+
786
+ return & streamStatus {
787
+ staleTimer : staleTimer ,
788
+ staleTimeout : staleTimeout ,
789
+ }
790
+ }
791
+
792
+ // stop cleans up any resources held by streamStatus.
793
+ func (s * streamStatus ) stop () {
794
+ if s .disabled {
795
+ return
796
+ }
797
+
798
+ s .Lock ()
799
+ defer s .Unlock ()
800
+
801
+ _ = s .staleTimer .Stop ()
802
+ }
803
+
804
+ // streamTaken should be called when one of the sub-streams (read or write)
805
+ // become occupied. This will stop the staleTimer. The read parameter should be
806
+ // true if the stream being returned is the read stream.
807
+ func (s * streamStatus ) streamTaken (read bool ) {
808
+ if s .disabled {
809
+ return
810
+ }
811
+
812
+ s .Lock ()
813
+ defer s .Unlock ()
814
+
815
+ if read {
816
+ s .readStreamOccupied = true
817
+ } else {
818
+ s .writeStreamOccupied = true
819
+ }
820
+ _ = s .staleTimer .Stop ()
821
+ }
822
+
823
+ // streamReturned should be called when one of the sub-streams are released.
824
+ // If the occupancy count after this call is zero, then the staleTimer is reset.
825
+ // The read parameter should be true if the stream being returned is the read
826
+ // stream.
827
+ func (s * streamStatus ) streamReturned (read bool ) {
828
+ if s .disabled {
829
+ return
830
+ }
831
+
832
+ s .Lock ()
833
+ defer s .Unlock ()
834
+
835
+ if read {
836
+ s .readStreamOccupied = false
837
+ } else {
838
+ s .writeStreamOccupied = false
839
+ }
840
+
841
+ if ! s .readStreamOccupied && ! s .writeStreamOccupied {
842
+ _ = s .staleTimer .Reset (s .staleTimeout )
843
+ }
844
+ }
0 commit comments