8
8
"sync"
9
9
"time"
10
10
11
+ "github.com/btcsuite/btclog/v2"
11
12
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
12
13
"github.com/lightningnetwork/lnd/tlv"
13
14
"github.com/prometheus/client_golang/prometheus"
@@ -104,8 +105,8 @@ func (r *readStream) ReadNextMsg(ctx context.Context) ([]byte, error) {
104
105
105
106
// ReturnStream gives up the read stream by passing it back up through the
106
107
// payment stream.
107
- func (r * readStream ) ReturnStream () {
108
- log .Debugf ( "Returning read stream %x" , r . parentStream . id [:] )
108
+ func (r * readStream ) ReturnStream (ctx context. Context ) {
109
+ log .DebugS ( ctx , "Returning read stream" )
109
110
r .parentStream .ReturnReadStream (r )
110
111
}
111
112
@@ -193,7 +194,7 @@ type stream struct {
193
194
}
194
195
195
196
// newStream creates a new stream independent of any given stream ID.
196
- func newStream (id streamID , limiter * rate.Limiter ,
197
+ func newStream (ctx context. Context , id streamID , limiter * rate.Limiter ,
197
198
equivAuth func (auth * hashmailrpc.CipherBoxAuth ) error ,
198
199
onStale func () error , staleTimeout time.Duration ) * stream {
199
200
@@ -210,7 +211,7 @@ func newStream(id streamID, limiter *rate.Limiter,
210
211
id : id ,
211
212
equivAuth : equivAuth ,
212
213
limiter : limiter ,
213
- status : newStreamStatus (onStale , staleTimeout ),
214
+ status : newStreamStatus (ctx , onStale , staleTimeout ),
214
215
readBytesChan : make (chan []byte ),
215
216
readErrChan : make (chan error , 1 ),
216
217
quit : make (chan struct {}),
@@ -305,8 +306,8 @@ func (s *stream) ReturnWriteStream(w *writeStream) {
305
306
// RequestReadStream attempts to request the read stream from the main backing
306
307
// stream. If we're unable to obtain it before the timeout, then an error is
307
308
// returned.
308
- func (s * stream ) RequestReadStream () (* readStream , error ) {
309
- log .Tracef ( "HashMailStream(%x): requesting read stream", s . id [:] )
309
+ func (s * stream ) RequestReadStream (ctx context. Context ) (* readStream , error ) {
310
+ log .TraceS ( ctx , "Requested read stream" )
310
311
311
312
select {
312
313
case r := <- s .readStreamChan :
@@ -320,8 +321,8 @@ func (s *stream) RequestReadStream() (*readStream, error) {
320
321
// RequestWriteStream attempts to request the read stream from the main backing
321
322
// stream. If we're unable to obtain it before the timeout, then an error is
322
323
// returned.
323
- func (s * stream ) RequestWriteStream () (* writeStream , error ) {
324
- log .Tracef ( "HashMailStream(%x): requesting write stream", s . id [:] )
324
+ func (s * stream ) RequestWriteStream (ctx context. Context ) (* writeStream , error ) {
325
+ log .TraceS ( ctx , "Requesting write stream" )
325
326
326
327
select {
327
328
case w := <- s .writeStreamChan :
@@ -389,8 +390,10 @@ func (h *hashMailServer) Stop() {
389
390
}
390
391
391
392
// tearDownStaleStream can be used to tear down a stale mailbox stream.
392
- func (h * hashMailServer ) tearDownStaleStream (id streamID ) error {
393
- log .Debugf ("Tearing down stale HashMail stream: id=%x" , id )
393
+ func (h * hashMailServer ) tearDownStaleStream (ctx context.Context ,
394
+ id streamID ) error {
395
+
396
+ log .DebugS (ctx , "Tearing down stale HashMail stream" )
394
397
395
398
h .Lock ()
396
399
defer h .Unlock ()
@@ -428,15 +431,15 @@ func (h *hashMailServer) ValidateStreamAuth(ctx context.Context,
428
431
}
429
432
430
433
// InitStream attempts to initialize a new stream given a valid descriptor.
431
- func (h * hashMailServer ) InitStream (
434
+ func (h * hashMailServer ) InitStream (ctx context. Context ,
432
435
init * hashmailrpc.CipherBoxAuth ) (* hashmailrpc.CipherInitResp , error ) {
433
436
434
437
h .Lock ()
435
438
defer h .Unlock ()
436
439
437
440
streamID := newStreamID (init .Desc .StreamId )
438
441
439
- log .Debugf ( "Creating new HashMail Stream: %x" , streamID )
442
+ log .DebugS ( ctx , "Creating new HashMail Stream" )
440
443
441
444
// The stream is already active, and we only allow a single session for
442
445
// a given stream to exist.
@@ -452,10 +455,11 @@ func (h *hashMailServer) InitStream(
452
455
rate .Every (h .cfg .msgRate ), h .cfg .msgBurstAllowance ,
453
456
)
454
457
freshStream := newStream (
455
- streamID , limiter , func (auth * hashmailrpc.CipherBoxAuth ) error {
458
+ ctx , streamID , limiter ,
459
+ func (auth * hashmailrpc.CipherBoxAuth ) error {
456
460
return nil
457
461
}, func () error {
458
- return h .tearDownStaleStream (streamID )
462
+ return h .tearDownStaleStream (ctx , streamID )
459
463
}, h .cfg .staleTimeout ,
460
464
)
461
465
@@ -470,7 +474,9 @@ func (h *hashMailServer) InitStream(
470
474
471
475
// LookUpReadStream attempts to loop up a new stream. If the stream is found, then
472
476
// the stream is marked as being active. Otherwise, an error is returned.
473
- func (h * hashMailServer ) LookUpReadStream (streamID []byte ) (* readStream , error ) {
477
+ func (h * hashMailServer ) LookUpReadStream (ctx context.Context ,
478
+ streamID []byte ) (* readStream , error ) {
479
+
474
480
h .RLock ()
475
481
defer h .RUnlock ()
476
482
@@ -479,12 +485,13 @@ func (h *hashMailServer) LookUpReadStream(streamID []byte) (*readStream, error)
479
485
return nil , fmt .Errorf ("stream not found" )
480
486
}
481
487
482
- return stream .RequestReadStream ()
488
+ return stream .RequestReadStream (ctx )
483
489
}
484
490
485
491
// LookUpWriteStream attempts to loop up a new stream. If the stream is found,
486
492
// then the stream is marked as being active. Otherwise, an error is returned.
487
- func (h * hashMailServer ) LookUpWriteStream (streamID []byte ) (* writeStream , error ) {
493
+ func (h * hashMailServer ) LookUpWriteStream (ctx context.Context ,
494
+ streamID []byte ) (* writeStream , error ) {
488
495
489
496
h .RLock ()
490
497
defer h .RUnlock ()
@@ -494,7 +501,7 @@ func (h *hashMailServer) LookUpWriteStream(streamID []byte) (*writeStream, error
494
501
return nil , fmt .Errorf ("stream not found" )
495
502
}
496
503
497
- return stream .RequestWriteStream ()
504
+ return stream .RequestWriteStream (ctx )
498
505
}
499
506
500
507
// TearDownStream attempts to tear down a stream which renders both sides of
@@ -523,8 +530,7 @@ func (h *hashMailServer) TearDownStream(ctx context.Context, streamID []byte,
523
530
return err
524
531
}
525
532
526
- log .Debugf ("Tearing down HashMail stream: id=%x, auth=%v" ,
527
- auth .Desc .StreamId , auth .Auth )
533
+ log .DebugS (ctx , "Tearing down HashMail stream" , "auth" , auth .Auth )
528
534
529
535
// At this point we know the auth was valid, so we'll tear down the
530
536
// stream.
@@ -568,16 +574,17 @@ func (h *hashMailServer) NewCipherBox(ctx context.Context,
568
574
return nil , err
569
575
}
570
576
571
- log .Debugf ("New HashMail stream init: id=%x, auth=%v" ,
572
- init .Desc .StreamId , init .Auth )
577
+ ctxl := btclog .WithCtx (ctx , btclog .Hex ("stream_id" , init .Desc .StreamId ))
578
+
579
+ log .DebugS (ctxl , "New HashMail stream init" , "auth" , init .Auth )
573
580
574
- if err := h .ValidateStreamAuth (ctx , init ); err != nil {
575
- log .Debugf ( "Stream creation validation failed (id=%x): %v " ,
576
- init . Desc . StreamId , err )
581
+ if err := h .ValidateStreamAuth (ctxl , init ); err != nil {
582
+ log .DebugS ( ctxl , "Stream creation validation failed" ,
583
+ "err" , err )
577
584
return nil , err
578
585
}
579
586
580
- resp , err := h .InitStream (init )
587
+ resp , err := h .InitStream (ctxl , init )
581
588
if err != nil {
582
589
return nil , err
583
590
}
@@ -597,8 +604,9 @@ func (h *hashMailServer) DelCipherBox(ctx context.Context,
597
604
return nil , err
598
605
}
599
606
600
- log .Debugf ("New HashMail stream deletion: id=%x, auth=%v" ,
601
- auth .Desc .StreamId , auth .Auth )
607
+ ctxl := btclog .WithCtx (ctx , btclog .Hex ("stream_id" , auth .Desc .StreamId ))
608
+
609
+ log .DebugS (ctxl , "New HashMail stream deletion" , "auth" , auth .Auth )
602
610
603
611
if err := h .TearDownStream (ctx , auth .Desc .StreamId , auth ); err != nil {
604
612
return nil , err
@@ -610,7 +618,7 @@ func (h *hashMailServer) DelCipherBox(ctx context.Context,
610
618
// SendStream implements the client streaming call to utilize the write end of
611
619
// a stream to send a message to the read end.
612
620
func (h * hashMailServer ) SendStream (readStream hashmailrpc.HashMail_SendStreamServer ) error {
613
- log .Debugf ("New HashMail write stream pending..." )
621
+ log .Debug ("New HashMail write stream pending..." )
614
622
615
623
// We'll need to receive the first message in order to determine if
616
624
// this stream exists or not
@@ -621,6 +629,11 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
621
629
return err
622
630
}
623
631
632
+ ctx := btclog .WithCtx (
633
+ readStream .Context (),
634
+ btclog .Hex ("stream_id" , cipherBox .Desc .StreamId ),
635
+ )
636
+
624
637
switch {
625
638
case cipherBox .Desc == nil :
626
639
return fmt .Errorf ("cipher box descriptor required" )
@@ -629,12 +642,11 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
629
642
return fmt .Errorf ("stream_id required" )
630
643
}
631
644
632
- log .Debugf ("New HashMail write stream: id=%x" ,
633
- cipherBox .Desc .StreamId )
645
+ log .DebugS (ctx , "New HashMail write stream" )
634
646
635
647
// Now that we have the first message, we can attempt to look up the
636
648
// given stream.
637
- writeStream , err := h .LookUpWriteStream (cipherBox .Desc .StreamId )
649
+ writeStream , err := h .LookUpWriteStream (ctx , cipherBox .Desc .StreamId )
638
650
if err != nil {
639
651
return err
640
652
}
@@ -643,13 +655,12 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
643
655
// write inactive if the client hangs up on their end.
644
656
defer writeStream .ReturnStream ()
645
657
646
- log .Tracef ( "Sending msg_len=%v to stream_id=%x" , len ( cipherBox . Msg ) ,
647
- cipherBox .Desc . StreamId )
658
+ log .TraceS ( ctx , "Sending message to stream" ,
659
+ "msg_len" , len ( cipherBox .Msg ) )
648
660
649
661
// We'll send the first message into the stream, then enter our loop
650
662
// below to continue to read from the stream and send it to the read
651
663
// end.
652
- ctx := readStream .Context ()
653
664
if err := writeStream .WriteMsg (ctx , cipherBox .Msg ); err != nil {
654
665
return err
655
666
}
@@ -659,7 +670,7 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
659
670
// exit before shutting down.
660
671
select {
661
672
case <- ctx .Done ():
662
- log .Debugf ( "SendStream: Context done, exiting" )
673
+ log .DebugS ( ctx , "SendStream: Context done, exiting" )
663
674
return nil
664
675
case <- h .quit :
665
676
return fmt .Errorf ("server shutting down" )
@@ -669,13 +680,13 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
669
680
670
681
cipherBox , err := readStream .Recv ()
671
682
if err != nil {
672
- log .Debugf ( "SendStream: Exiting write stream RPC " +
673
- "stream read: %v " , err )
683
+ log .DebugS ( ctx , "SendStream: Exiting write stream RPC " +
684
+ "stream read" , err )
674
685
return err
675
686
}
676
687
677
- log .Tracef ( "Sending msg_len=%v to stream_id=%x " ,
678
- len (cipherBox .Msg ), cipherBox . Desc . StreamId )
688
+ log .TraceS ( ctx , "Sending message to stream " ,
689
+ "msg_len" , len (cipherBox .Msg ))
679
690
680
691
if err := writeStream .WriteMsg (ctx , cipherBox .Msg ); err != nil {
681
692
return err
@@ -689,25 +700,30 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
689
700
func (h * hashMailServer ) RecvStream (desc * hashmailrpc.CipherBoxDesc ,
690
701
reader hashmailrpc.HashMail_RecvStreamServer ) error {
691
702
703
+ ctx := btclog .WithCtx (
704
+ reader .Context (),
705
+ btclog .Hex ("stream_id" , desc .StreamId ),
706
+ )
707
+
692
708
// First, we'll attempt to locate the stream. We allow any single
693
709
// entity that knows of the full stream ID to access the read end.
694
- readStream , err := h .LookUpReadStream (desc .StreamId )
710
+ readStream , err := h .LookUpReadStream (ctx , desc .StreamId )
695
711
if err != nil {
696
712
return err
697
713
}
698
714
699
- log .Debugf ( "New HashMail read stream: id=%x" , desc . StreamId )
715
+ log .DebugS ( ctx , "New HashMail read stream" )
700
716
701
717
// If the reader hangs up, then we'll mark the stream as inactive so
702
718
// another can take its place.
703
- defer readStream .ReturnStream ()
719
+ defer readStream .ReturnStream (ctx )
704
720
705
721
for {
706
722
// Check to see if the stream has been closed or if we need to
707
- // exit before shutting down .
723
+ // exit before shutting d[own .
708
724
select {
709
725
case <- reader .Context ().Done ():
710
- log .Debugf ( "Read stream context done." )
726
+ log .DebugS ( ctx , "Read stream context done." )
711
727
return nil
712
728
case <- h .quit :
713
729
return fmt .Errorf ("server shutting down" )
@@ -717,12 +733,11 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
717
733
718
734
nextMsg , err := readStream .ReadNextMsg (reader .Context ())
719
735
if err != nil {
720
- log .Debugf ( "Got error an read stream read: %v " , err )
736
+ log .ErrorS ( ctx , "Got error on read stream read" , err )
721
737
return err
722
738
}
723
739
724
- log .Tracef ("Read %v bytes for HashMail stream_id=%x" ,
725
- len (nextMsg ), desc .StreamId )
740
+ log .TraceS (ctx , "Read bytes" , "msg_len" , len (nextMsg ))
726
741
727
742
// In order not to duplicate metric data, we only record this
728
743
// read if its streamID is odd. We use the base stream ID as the
@@ -742,8 +757,8 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
742
757
Msg : nextMsg ,
743
758
})
744
759
if err != nil {
745
- log .Debugf ( "Got error when sending on read stream: %v " ,
746
- err )
760
+ log .DebugS ( ctx , "Got error when sending on read stream" ,
761
+ "err" , err )
747
762
return err
748
763
}
749
764
}
@@ -767,7 +782,7 @@ type streamStatus struct {
767
782
}
768
783
769
784
// newStreamStatus constructs a new streamStatus instance.
770
- func newStreamStatus (onStale func () error ,
785
+ func newStreamStatus (ctx context. Context , onStale func () error ,
771
786
staleTimeout time.Duration ) * streamStatus {
772
787
773
788
if staleTimeout < 0 {
@@ -778,7 +793,7 @@ func newStreamStatus(onStale func() error,
778
793
779
794
staleTimer := time .AfterFunc (staleTimeout , func () {
780
795
if err := onStale (); err != nil {
781
- log .Errorf ( "error in onStale callback: %v " , err )
796
+ log .ErrorS ( ctx , "Error from onStale callback" , err )
782
797
}
783
798
})
784
799
0 commit comments