@@ -3,17 +3,65 @@ package main
3
3
import (
4
4
"errors"
5
5
"flag"
6
- "github.com/bzed/go-whisper"
7
- "github.com/marpaia/graphite-golang"
8
6
"log"
9
7
"math"
10
8
"os"
11
9
"path/filepath"
12
10
"strconv"
13
11
"strings"
14
12
"sync"
13
+ "time"
14
+
15
+ "github.com/bzed/go-whisper"
16
+ "github.com/marpaia/graphite-golang"
15
17
)
16
18
19
+ type rateLimiter struct {
20
+ pointsPerSecond int64
21
+ currentPoints int64
22
+ full chan bool
23
+ lock * sync.Mutex
24
+ enabled bool
25
+ }
26
+
27
+ func newRateLimiter (pointsPerSecond int64 ) * rateLimiter {
28
+ rl := new (rateLimiter )
29
+ rl .pointsPerSecond = pointsPerSecond
30
+ rl .currentPoints = 0
31
+ rl .full = make (chan bool )
32
+ rl .lock = new (sync.Mutex )
33
+ if pointsPerSecond == 0 {
34
+ rl .enabled = false
35
+ } else {
36
+ rl .enabled = true
37
+ go func () {
38
+ for {
39
+ time .Sleep (1 * time .Second )
40
+ select {
41
+ case <- rl .full :
42
+ default :
43
+ }
44
+ }
45
+ }()
46
+ return rl
47
+ }
48
+ return rl
49
+ }
50
+
51
+ func (rl * rateLimiter ) limit (n int64 ) {
52
+ if ! rl .enabled {
53
+ return
54
+ }
55
+ rl .lock .Lock ()
56
+ defer rl .lock .Unlock ()
57
+
58
+ rl .currentPoints += n
59
+ if rl .currentPoints >= rl .pointsPerSecond {
60
+ rl .full <- true
61
+ rl .currentPoints = 0
62
+ }
63
+ }
64
+
17
65
func convertFilename (filename string , baseDirectory string ) (string , error ) {
18
66
absFilename , err := filepath .Abs (filename )
19
67
if err != nil {
@@ -48,6 +96,7 @@ func sendWhisperData(
48
96
graphiteConn * graphite.Graphite ,
49
97
fromTs int ,
50
98
toTs int ,
99
+ rateLimiter * rateLimiter ,
51
100
) error {
52
101
metricName , err := convertFilename (filename , baseDirectory )
53
102
if err != nil {
@@ -75,6 +124,7 @@ func sendWhisperData(
75
124
metrics = append (metrics , graphite .NewMetric (metricName , v , int64 (interval )))
76
125
77
126
}
127
+ rateLimiter .limit (int64 (len (metrics )))
78
128
err = graphiteConn .SendMetrics (metrics )
79
129
if err != nil {
80
130
return err
@@ -107,8 +157,8 @@ func worker(ch chan string,
107
157
graphitePort int ,
108
158
graphiteProtocol string ,
109
159
fromTs int ,
110
- toTs int ) {
111
-
160
+ toTs int ,
161
+ rateLimiter * rateLimiter ) {
112
162
defer wg .Done ()
113
163
114
164
graphiteConn , err := graphite .GraphiteFactory (graphiteProtocol , graphiteHost , graphitePort , "" )
@@ -121,7 +171,7 @@ func worker(ch chan string,
121
171
case path := <- ch :
122
172
{
123
173
124
- err := sendWhisperData (path , baseDirectory , graphiteConn , fromTs , toTs )
174
+ err := sendWhisperData (path , baseDirectory , graphiteConn , fromTs , toTs , rateLimiter )
125
175
if err != nil {
126
176
log .Println ("Failed: " + path )
127
177
log .Println (err )
@@ -170,6 +220,10 @@ func main() {
170
220
"to" ,
171
221
2147483647 ,
172
222
"Ending timestamp to dump data up to" )
223
+ pointsPerSecond := flag .Int64 (
224
+ "pps" ,
225
+ 0 ,
226
+ "Number of maximum points per second to send (0 means rate limiter is disabled)" )
173
227
flag .Parse ()
174
228
175
229
if ! (* graphiteProtocol == "tcp" ||
@@ -181,9 +235,10 @@ func main() {
181
235
quit := make (chan int )
182
236
var wg sync.WaitGroup
183
237
238
+ rl := newRateLimiter (* pointsPerSecond )
184
239
wg .Add (* workers )
185
240
for i := 0 ; i < * workers ; i ++ {
186
- go worker (ch , quit , & wg , * baseDirectory , * graphiteHost , * graphitePort , * graphiteProtocol , * fromTs , * toTs )
241
+ go worker (ch , quit , & wg , * baseDirectory , * graphiteHost , * graphitePort , * graphiteProtocol , * fromTs , * toTs , rl )
187
242
}
188
243
go findWhisperFiles (ch , quit , * directory )
189
244
wg .Wait ()
0 commit comments