Skip to content

Commit 20564d4

Browse files
authored
Merge pull request #172 from djkazic/aggregated-session-stats
aperture: internally aggregate session statistics to reduce cardinality
2 parents 47d72bd + eab9d75 commit 20564d4

File tree

4 files changed

+146
-25
lines changed

4 files changed

+146
-25
lines changed

aperture.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func run() error {
151151

152152
errChan := make(chan error)
153153
a := NewAperture(cfg)
154-
if err := a.Start(errChan); err != nil {
154+
if err := a.Start(errChan, interceptor.ShutdownChannel()); err != nil {
155155
return fmt.Errorf("unable to start aperture: %v", err)
156156
}
157157

@@ -192,9 +192,9 @@ func NewAperture(cfg *Config) *Aperture {
192192
}
193193

194194
// Start sets up the proxy server and starts it.
195-
func (a *Aperture) Start(errChan chan error) error {
195+
func (a *Aperture) Start(errChan chan error, shutdown <-chan struct{}) error {
196196
// Start the prometheus exporter.
197-
err := StartPrometheusExporter(a.cfg.Prometheus)
197+
err := StartPrometheusExporter(a.cfg.Prometheus, shutdown)
198198
if err != nil {
199199
return fmt.Errorf("unable to start the prometheus "+
200200
"exporter: %v", err)

hashmail_server.go

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/btcsuite/btclog/v2"
1212
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
1313
"github.com/lightningnetwork/lnd/tlv"
14-
"github.com/prometheus/client_golang/prometheus"
1514
"golang.org/x/time/rate"
1615
"google.golang.org/grpc/codes"
1716
"google.golang.org/grpc/status"
@@ -35,6 +34,11 @@ const (
3534
// DefaultBufSize is the default number of bytes that are read in a
3635
// single operation.
3736
DefaultBufSize = 4096
37+
38+
// streamTTL is the amount of time that a stream needs to be exist without
39+
// reads for it to be considered for pruning. Otherwise, memory will grow
40+
// unbounded.
41+
streamTTL = 24 * time.Hour
3842
)
3943

4044
// streamID is the identifier of a stream.
@@ -747,9 +751,7 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
747751
streamID := newStreamID(desc.StreamId)
748752
if streamID.isOdd() {
749753
baseID := streamID.baseID()
750-
mailboxReadCount.With(prometheus.Labels{
751-
streamIDLabel: fmt.Sprintf("%x", baseID),
752-
}).Inc()
754+
streamActivityTracker.Record(fmt.Sprintf("%x", baseID))
753755
}
754756

755757
err = reader.Send(&hashmailrpc.CipherBox{
@@ -766,6 +768,91 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
766768

767769
var _ hashmailrpc.HashMailServer = (*hashMailServer)(nil)
768770

771+
// streamActivity tracks per-session read activity for classifying mailbox
772+
// sessions as active, standby, or in-use. It maintains an in-memory map
773+
// of stream IDs to counters and timestamps.
774+
type streamActivity struct {
775+
sync.Mutex
776+
streams map[string]*activityEntry
777+
}
778+
779+
// activityEntry holds the read count and last update time for a single mailbox
780+
// session.
781+
type activityEntry struct {
782+
count uint64
783+
lastUpdate time.Time
784+
}
785+
786+
// newStreamActivity creates a new streamActivity tracker used to monitor
787+
// mailbox read activity per stream ID.
788+
func newStreamActivity() *streamActivity {
789+
return &streamActivity{
790+
streams: make(map[string]*activityEntry),
791+
}
792+
}
793+
794+
// Record logs a read event for the given base stream ID.
795+
// It increments the read count and updates the last activity timestamp.
796+
func (sa *streamActivity) Record(baseID string) {
797+
sa.Lock()
798+
defer sa.Unlock()
799+
800+
entry, ok := sa.streams[baseID]
801+
if !ok {
802+
entry = &activityEntry{}
803+
sa.streams[baseID] = entry
804+
}
805+
entry.count++
806+
entry.lastUpdate = time.Now()
807+
}
808+
809+
// ClassifyAndReset categorizes each tracked stream based on its recent read
810+
// rate and returns aggregate counts of active, standby, and in-use sessions.
811+
// A stream is classified as:
812+
// - In-use: if read rate ≥ 0.5 reads/sec.
813+
// - Standby: if 0 < read rate < 0.5 reads/sec.
814+
// - Active: if read rate > 0 (includes standby and in-use).
815+
func (sa *streamActivity) ClassifyAndReset() (active, standby, inuse int) {
816+
sa.Lock()
817+
defer sa.Unlock()
818+
819+
now := time.Now()
820+
821+
for baseID, e := range sa.streams {
822+
inactiveDuration := now.Sub(e.lastUpdate)
823+
824+
// Prune if idle for >24h and no new reads.
825+
if e.count == 0 && inactiveDuration > streamTTL {
826+
delete(sa.streams, baseID)
827+
continue
828+
}
829+
830+
elapsed := inactiveDuration.Seconds()
831+
if elapsed <= 0 {
832+
// Prevent divide-by-zero, treat as 1s interval.
833+
elapsed = 1
834+
}
835+
836+
rate := float64(e.count) / elapsed
837+
838+
switch {
839+
case rate >= 0.5:
840+
inuse++
841+
case rate > 0:
842+
standby++
843+
}
844+
if rate > 0 {
845+
active++
846+
}
847+
848+
// Reset for next window.
849+
e.count = 0
850+
e.lastUpdate = now
851+
}
852+
853+
return active, standby, inuse
854+
}
855+
769856
// streamStatus keeps track of the occupancy status of a stream's read and
770857
// write sub-streams. It is initialised with callback functions to call on the
771858
// event of the streams being occupied (either or both of the streams are

hashmail_server_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ func setupAperture(t *testing.T) {
186186
}
187187
aperture := NewAperture(apertureCfg)
188188
errChan := make(chan error)
189-
require.NoError(t, aperture.Start(errChan))
189+
shutdown := make(chan struct{})
190+
require.NoError(t, aperture.Start(errChan, shutdown))
190191

191192
// Any error while starting?
192193
select {

prometheus.go

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,44 @@ package aperture
33
import (
44
"fmt"
55
"net/http"
6+
"time"
67

78
"github.com/prometheus/client_golang/prometheus"
89
"github.com/prometheus/client_golang/prometheus/promhttp"
910
)
1011

11-
const streamIDLabel = "streamID"
12-
1312
var (
1413
// mailboxCount tracks the current number of active mailboxes.
1514
mailboxCount = prometheus.NewGauge(prometheus.GaugeOpts{
1615
Namespace: "hashmail",
1716
Name: "mailbox_count",
1817
})
1918

20-
// mailboxReadCount counts each time a mailbox pair is being used.
21-
// A session consists of a bidirectional stream each using a mailbox
22-
// with an ID that overlaps for the first 63 bytes and differ for the
23-
// last bit. So in order to obtain accurate data about a specific
24-
// mailbox session, the stream ID that will be recorded is the first
25-
// 16 bytes of the session ID and we will only record the odd stream's
26-
// reads so that we don't duplicate the data.
27-
mailboxReadCount = prometheus.NewCounterVec(
28-
prometheus.CounterOpts{
29-
Namespace: "hashmail",
30-
Name: "mailbox_read_count",
31-
}, []string{streamIDLabel},
32-
)
19+
// activeSessions tracks the active session count for mailbox.
20+
activeSessions = prometheus.NewGauge(prometheus.GaugeOpts{
21+
Namespace: "hashmail",
22+
Name: "mailbox_active_sessions",
23+
Help: "Number of active sessions",
24+
})
25+
26+
// standbySessions tracks the standby session count for mailbox.
27+
standbySessions = prometheus.NewGauge(prometheus.GaugeOpts{
28+
Namespace: "hashmail",
29+
Name: "mailbox_standby_sessions",
30+
Help: "Number of standby sessions",
31+
})
32+
33+
// inUseSessions tracks the in-use session count for mailbox.
34+
inUseSessions = prometheus.NewGauge(prometheus.GaugeOpts{
35+
Namespace: "hashmail",
36+
Name: "mailbox_inuse_sessions",
37+
Help: "Number of in-use sessions",
38+
})
3339
)
3440

41+
// streamActivityTracker handles the calculation of session statistics.
42+
var streamActivityTracker = newStreamActivity()
43+
3544
// PrometheusConfig is the set of configuration data that specifies if
3645
// Prometheus metric exporting is activated, and if so the listening address of
3746
// the Prometheus server.
@@ -47,15 +56,39 @@ type PrometheusConfig struct {
4756
// StartPrometheusExporter registers all relevant metrics with the Prometheus
4857
// library, then launches the HTTP server that Prometheus will hit to scrape
4958
// our metrics.
50-
func StartPrometheusExporter(cfg *PrometheusConfig) error {
59+
func StartPrometheusExporter(cfg *PrometheusConfig,
60+
shutdown <-chan struct{}) error {
61+
5162
// If we're not active, then there's nothing more to do.
5263
if !cfg.Enabled {
5364
return nil
5465
}
5566

5667
// Next, we'll register all our metrics.
5768
prometheus.MustRegister(mailboxCount)
58-
prometheus.MustRegister(mailboxReadCount)
69+
prometheus.MustRegister(activeSessions)
70+
prometheus.MustRegister(standbySessions)
71+
prometheus.MustRegister(inUseSessions)
72+
73+
// Periodically update session classification metrics from internal tracker.
74+
go func() {
75+
ticker := time.NewTicker(10 * time.Second)
76+
defer ticker.Stop()
77+
78+
for {
79+
select {
80+
case <-ticker.C:
81+
active, standby, inuse :=
82+
streamActivityTracker.ClassifyAndReset()
83+
activeSessions.Set(float64(active))
84+
standbySessions.Set(float64(standby))
85+
inUseSessions.Set(float64(inuse))
86+
case <-shutdown:
87+
log.Infof("Shutting down Prometheus session metrics updater")
88+
return
89+
}
90+
}
91+
}()
5992

6093
// Finally, we'll launch the HTTP server that Prometheus will use to
6194
// scape our metrics.

0 commit comments

Comments
 (0)