Skip to content

Commit 087a846

Browse files
committed
added connection retry for streaming API
1 parent b86e4ad commit 087a846

File tree

2 files changed

+61
-25
lines changed

2 files changed

+61
-25
lines changed

atlas/stream_strategy.go

Lines changed: 59 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,69 +3,109 @@ package atlas
33
import (
44
"context"
55
"strconv"
6+
"strings"
67
"sync"
8+
"time"
79

810
"github.com/DNS-OARC/ripeatlas/measurement"
911
"github.com/prometheus/common/log"
1012

1113
"github.com/DNS-OARC/ripeatlas"
1214
)
1315

16+
const ConnectionRetryInterval = 30 * time.Second
17+
1418
type streamingStrategy struct {
1519
stream *ripeatlas.Stream
1620
results map[string]map[int]*measurement.Result
1721
workers uint
22+
timeout time.Duration
1823
mu sync.RWMutex
1924
}
2025

2126
// NewStreamingStrategy returns an strategy using the RIPE Atlas Streaming API
22-
func NewStreamingStrategy(ctx context.Context, ids []string, workers uint) (Strategy, error) {
27+
func NewStreamingStrategy(ctx context.Context, ids []string, workers uint, timeout time.Duration) Strategy {
2328
s := &streamingStrategy{
2429
stream: ripeatlas.NewStream(),
2530
workers: workers,
31+
timeout: timeout,
2632
results: make(map[string]map[int]*measurement.Result),
2733
}
2834

29-
err := s.start(ctx, ids)
30-
if err != nil {
31-
return nil, err
32-
}
33-
34-
return s, nil
35+
s.start(ctx, ids)
36+
return s
3537
}
3638

37-
func (s *streamingStrategy) start(ctx context.Context, ids []string) error {
39+
func (s *streamingStrategy) start(ctx context.Context, ids []string) {
3840
for _, id := range ids {
39-
msm, err := strconv.Atoi(id)
41+
go s.startListening(ctx, id)
42+
}
43+
}
44+
45+
func (s *streamingStrategy) startListening(ctx context.Context, id string) {
46+
for {
47+
ch, err := s.subscribe(id)
4048
if err != nil {
41-
return err
49+
log.Error(err)
50+
} else {
51+
log.Infof("Subscribed to results of measurement #%s", id)
52+
s.listenForResults(ctx, ch)
4253
}
4354

44-
ch, err := s.stream.MeasurementResults(ripeatlas.Params{
45-
"msm": msm,
46-
})
47-
if err != nil {
48-
return err
55+
select {
56+
case <-ctx.Done():
57+
return
58+
case <-time.After(ConnectionRetryInterval):
59+
delete(s.results, id)
60+
continue
4961
}
62+
}
63+
}
64+
65+
func (s *streamingStrategy) subscribe(id string) (<-chan *measurement.Result, error) {
66+
msm, err := strconv.Atoi(id)
67+
if err != nil {
68+
return nil, err
69+
}
5070

51-
go s.listenForResults(ctx, ch)
71+
ch, err := s.stream.MeasurementResults(ripeatlas.Params{
72+
"msm": msm,
73+
})
74+
if err != nil {
75+
return nil, err
5276
}
5377

54-
return nil
78+
return ch, nil
5579
}
5680

5781
func (s *streamingStrategy) listenForResults(ctx context.Context, ch <-chan *measurement.Result) {
5882
for {
5983
select {
6084
case m := <-ch:
61-
go s.warmProbeCache(m)
62-
s.addOrReplace(m)
85+
if m.ParseError != nil {
86+
log.Error(m.ParseError)
87+
}
88+
89+
if m.ParseError != nil && strings.HasPrefix(m.ParseError.Error(), "c.On(disconnect)") {
90+
log.Error(m.ParseError)
91+
return
92+
}
93+
94+
s.processMeasurement(m)
95+
case <-time.After(s.timeout):
96+
log.Errorf("Timeout reached. Trying to reconnect.")
97+
return
6398
case <-ctx.Done():
6499
return
65100
}
66101
}
67102
}
68103

104+
func (s *streamingStrategy) processMeasurement(m *measurement.Result) {
105+
go s.warmProbeCache(m)
106+
s.addOrReplace(m)
107+
}
108+
69109
func (s *streamingStrategy) warmProbeCache(m *measurement.Result) {
70110
_, err := probeForID(m.PrbId())
71111
if err != nil {

main.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ var (
3030
timeout = flag.Duration("timeout", 60*time.Second, "Timeout")
3131
workerCount = flag.Uint("worker.count", 8, "Number of go routines retrieving probe information")
3232
streaming = flag.Bool("streaming", true, "Retrieve data by subscribing to Atlas Streaming API")
33+
streamingTimeout = flag.Duration("streaming.timeout", 5*time.Minute, "When no update is received in this timespan a reconnect is initiated.")
3334
cfg *config.Config
3435
strategy atlas.Strategy
3536
)
@@ -59,12 +60,7 @@ func main() {
5960
if *streaming {
6061
ctx, cancel := context.WithCancel(context.Background())
6162
defer cancel()
62-
strategy, err = atlas.NewStreamingStrategy(ctx, cfg.Measurements, *workerCount)
63-
if err != nil {
64-
log.Error(err)
65-
os.Exit(2)
66-
}
67-
63+
strategy = atlas.NewStreamingStrategy(ctx, cfg.Measurements, *workerCount, *streamingTimeout)
6864
} else {
6965
strategy = atlas.NewRequestStrategy(*workerCount)
7066
}

0 commit comments

Comments
 (0)