Skip to content

Commit d88adac

Browse files
authored
Merge pull request #18 from veksh/ratelim-window
change rate limiter to window
2 parents 43877da + aee23f9 commit d88adac

File tree

6 files changed

+144
-23
lines changed

6 files changed

+144
-23
lines changed

internal/client/client.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@ import (
2323

2424
const (
2525
HTTP_TIMEOUT = 10
26-
HTTP_RPS = 1
27-
HTTP_BURST = 60
28-
DOMAINS_URL = "/v1/domains/"
26+
// burst RL: not currently used
27+
HTTP_RPS = 1
28+
HTTP_BURST = 60
29+
// window RL: window size, max requests per window
30+
HTTP_RATE_WINDOW = time.Duration(60) * time.Second
31+
HTTP_RATE_RPW = 60
32+
DOMAINS_URL = "/v1/domains/"
2933
)
3034

3135
var _ model.DNSApiClient = Client{}
@@ -45,17 +49,14 @@ func NewClient(apiURL string, key string, secret string) (*Client, error) {
4549
Timeout: HTTP_TIMEOUT * time.Second}).DialContext,
4650
TLSHandshakeTimeout: HTTP_TIMEOUT * time.Second,
4751
ResponseHeaderTimeout: HTTP_TIMEOUT * time.Second,
48-
MaxIdleConns: 10,
49-
MaxIdleConnsPerHost: 10,
50-
MaxConnsPerHost: 10,
51-
IdleConnTimeout: 60,
5252
}
53-
rateLimiter, err := ratelimiter.New(HTTP_RPS, HTTP_BURST)
53+
// TODO: mb make it pluggable as a parameter
54+
// rateLimiter, err := ratelimiter.NewBucketRL(HTTP_RPS, HTTP_BURST)
55+
rateLimiter, err := ratelimiter.NewWindowRL(HTTP_RATE_WINDOW, HTTP_RATE_RPW)
5456
if err != nil {
5557
return nil, errors.Wrap(err, "cannot create rate limiter")
5658
}
5759
httpClient := http.Client{
58-
Timeout: HTTP_TIMEOUT * time.Second,
5960
Transport: &rateLimitedHTTPTransport{
6061
limiter: rateLimiter,
6162
next: httpTransport,
@@ -93,9 +94,6 @@ func (c Client) makeRecordsRequest(ctx context.Context, path string, method stri
9394

9495
requestURL, _ := url.JoinPath(c.apiURL, DOMAINS_URL, path)
9596

96-
ctx, fnCancel := context.WithTimeout(ctx, HTTP_TIMEOUT*time.Second)
97-
defer fnCancel()
98-
9997
req, err := http.NewRequestWithContext(ctx, method, requestURL, body)
10098
if err != nil {
10199
return nil, errors.Wrap(err, "cannot create request")

internal/client/ratelimittransport.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import (
77
)
88

99
type rateLimitedHTTPTransport struct {
10-
limiter *ratelimiter.RateLimiter
10+
limiter ratelimiter.Limiter
1111
next http.RoundTripper
1212
}
1313

1414
func (t *rateLimitedHTTPTransport) RoundTrip(req *http.Request) (*http.Response, error) {
15-
t.limiter.Wait()
15+
if err := t.limiter.WaitCtx(req.Context()); err != nil {
16+
return nil, err
17+
}
1618
return t.next.RoundTrip(req)
1719
}

internal/provider/record_test.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestUnitALifecycle(t *testing.T) {
5252
// 2nd step: update
5353
mRecsToUpd := makeTestRecSet(model.REC_A, []model.DNSRecordData{"2.2.2.2", "4.4.4.4"})
5454
mRecsToUpdTgt := makeTestRecSet(model.REC_A, []model.DNSRecordData{"1.1.1.1", "2.2.2.2", "4.4.4.4"})
55-
// mRecsTgt4 := makeTestRecSet(model.REC_A, []model.DNSRecordData{"1.1.1.1", "4.4.4.4"})
55+
mRecsTgt4 := makeTestRecSet(model.REC_A, []model.DNSRecordData{"1.1.1.1", "4.4.4.4"})
5656

5757
mType, mName := model.REC_A, mRecsPre.DNSRecName
5858

@@ -67,11 +67,27 @@ func TestUnitALifecycle(t *testing.T) {
6767
mockClientUpd := model.NewMockDNSApiClient(t)
6868
mockClientUpd.EXPECT().GetRecords(mCtx, mDom, mType, mName).Return(mRecsTgt.Records, nil).Times(3).Run(traceMarker("get 1 (mRegsTgt)*3"))
6969
mockClientUpd.EXPECT().SetRecords(mCtx, mDom, mType, mName, mRecsToUpdTgt.UpdRecords).Return(nil).Once().Run(traceMarker("set 1 (mRecsToUpdTgt)"))
70-
// again, clean-up order is not deterministic, so those are a bit fragile
7170
mockClientUpd.EXPECT().GetRecords(mCtx, mDom, mType, mName).Return(mRecsToUpdTgt.Records, nil).Times(3).Run(traceMarker("get 2/1 (mRecsToUpdTgt)*3"))
72-
mockClientUpd.EXPECT().GetRecords(mCtx, mDom, mType, mName).Return(mRecsTgt1.Records, nil).Once().Run(traceMarker("get 2/2 (mRecsTgt1)"))
7371

74-
mockClientUpd.EXPECT().SetRecords(mCtx, mDom, mType, mName, mRecsTgt1.UpdRecords).Return(nil).Once().Run(traceMarker("set 2/1 (mRecsTgt1)"))
72+
// again, clean-up order is not deterministic, either could be called first
73+
// lets stash the expected result and return it later
74+
var res1or4 []model.DNSRecord = make([]model.DNSRecord, 2)
75+
mockClientUpd.EXPECT().SetRecords(mCtx, mDom, mType, mName, mRecsTgt1.UpdRecords).Return(nil).Maybe().
76+
Run(func(args mock.Arguments) {
77+
fmt.Println("set 2/1 (mRecsTgt1)", args[4])
78+
copy(res1or4, mRecsTgt1.Records)
79+
})
80+
mockClientUpd.EXPECT().SetRecords(mCtx, mDom, mType, mName, mRecsTgt4.UpdRecords).Return(nil).Maybe().
81+
Run(func(args mock.Arguments) {
82+
fmt.Println("set 2/1 (mRecsTgt4)", args[4])
83+
copy(res1or4, mRecsTgt4.Records)
84+
})
85+
// now need to return right results depending on set above
86+
mockClientUpd.EXPECT().GetRecords(mCtx, mDom, mType, mName).Return(res1or4, nil).Once().
87+
Run(func(args mock.Arguments) {
88+
fmt.Println("get 2/2 (mRecsTgt1/4 = " + res1or4[1].Data + ")")
89+
})
90+
// and, finally, the common cleanup
7591
mockClientUpd.EXPECT().SetRecords(mCtx, mDom, mType, mName, mRecsPre.UpdRecords).Return(nil).Once().Run(traceMarker("set 2/2 (mRecsPre)"))
7692

7793
resource.UnitTest(t, resource.TestCase{

libs/ratelimiter/ratelimiter.go renamed to libs/ratelimiter/bucket.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
// - check the time since a token was added, mb add some more
1515
// - spend one if now it is available
1616
// - if none, wait until next one will be available (while holding mutex)
17-
type RateLimiter struct {
17+
type BucketRateLimiter struct {
1818
mu sync.Mutex
1919
// interval between tokens, s
2020
period time.Duration
@@ -27,11 +27,11 @@ type RateLimiter struct {
2727
}
2828

2929
// context to cancel, rate per second, burst (bucket) size
30-
func New(rate, burst int) (*RateLimiter, error) {
30+
func NewBucketRL(rate, burst int) (Limiter, error) {
3131
if rate <= 0 || burst <= 0 {
3232
return nil, errors.New("limiter rate and burst must be positive")
3333
}
34-
return &RateLimiter{
34+
return &BucketRateLimiter{
3535
period: time.Second / time.Duration(rate),
3636
bucketSize: burst,
3737
numTokens: burst,
@@ -40,7 +40,7 @@ func New(rate, burst int) (*RateLimiter, error) {
4040
}
4141

4242
// block until token becomes available
43-
func (s *RateLimiter) Wait() {
43+
func (s *BucketRateLimiter) Wait() {
4444
s.mu.Lock()
4545
defer s.mu.Unlock()
4646
// if bucket is empty: add tokens that are due, update last refill time
@@ -67,7 +67,7 @@ func (s *RateLimiter) Wait() {
6767
}
6868

6969
// block until token becomes available, with cancellable context
70-
func (s *RateLimiter) WaitCtx(ctx context.Context) error {
70+
func (s *BucketRateLimiter) WaitCtx(ctx context.Context) error {
7171
if ctx.Err() != nil {
7272
return ctx.Err()
7373
}

libs/ratelimiter/limiter.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package ratelimiter
2+
3+
import "context"
4+
5+
// GoDaddy current rate limiter: 60 requests in 60 seconds window per API endpoint
6+
// https://developer.godaddy.com/getstarted#terms
7+
// mb have separate limits per endpoint (get/set/del): need to see a request
8+
type Limiter interface {
9+
Wait()
10+
WaitCtx(ctx context.Context) error
11+
}

libs/ratelimiter/window.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package ratelimiter
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
"time"
8+
)
9+
10+
// fixed winow rate limiter: limit number of request allowed per time interval
11+
// - keeps track of interval start and last call time
12+
// - on acquire:
13+
// - if last call was > interval length ago, refill bucket
14+
// - if there are tokens available, spend one and return immediately
15+
// - if not, wait until the end of interval, refill and spend one
16+
// - somewhat unpleasant side effect will be the burst of `bucketSize` requests
17+
// followed by the large wait for next: alternative is 1 RPS which is way slower
18+
// for shorter batches
19+
type WindowRateLimiter struct {
20+
mu sync.Mutex
21+
// interval between bucket refills
22+
period time.Duration
23+
// total bucket size: num of requests per period
24+
bucketSize int
25+
// number of requests left for current period
26+
numTokens int
27+
// the time last bucket was refilled
28+
lastRefillTime time.Time
29+
}
30+
31+
// period length and num of requests per period
32+
func NewWindowRL(period time.Duration, RPP int) (Limiter, error) {
33+
if period <= 0 || RPP <= 0 {
34+
return nil, errors.New("limiter period and num of requests must be positive")
35+
}
36+
return &WindowRateLimiter{
37+
period: period,
38+
bucketSize: RPP,
39+
numTokens: RPP,
40+
lastRefillTime: time.Now(),
41+
}, nil
42+
}
43+
44+
// block until token becomes available
45+
func (s *WindowRateLimiter) Wait() {
46+
s.mu.Lock()
47+
defer s.mu.Unlock()
48+
// if last refilled happened long ago: refill now
49+
if time.Since(s.lastRefillTime) > s.period {
50+
s.lastRefillTime = time.Now()
51+
s.numTokens = s.bucketSize
52+
}
53+
// if bucket is empty: wait for next refill
54+
if s.numTokens == 0 {
55+
nextTokenTime := s.lastRefillTime.Add(s.period)
56+
waitDuration := time.Until(nextTokenTime)
57+
time.Sleep(waitDuration)
58+
59+
s.lastRefillTime = time.Now()
60+
s.numTokens = s.bucketSize
61+
}
62+
s.numTokens--
63+
}
64+
65+
// block until token becomes available, with cancellable context
66+
func (s *WindowRateLimiter) WaitCtx(ctx context.Context) error {
67+
if ctx.Err() != nil {
68+
return ctx.Err()
69+
}
70+
s.mu.Lock()
71+
defer s.mu.Unlock()
72+
73+
// if last refilled happened long ago: refill now
74+
if time.Since(s.lastRefillTime) > s.period {
75+
s.lastRefillTime = time.Now()
76+
s.numTokens = s.bucketSize
77+
}
78+
// if bucket is empty: wait for next refill
79+
if s.numTokens == 0 {
80+
nextTokenTime := s.lastRefillTime.Add(s.period)
81+
waitDuration := time.Until(nextTokenTime)
82+
83+
select {
84+
case <-ctx.Done():
85+
return ctx.Err()
86+
case <-time.After(waitDuration):
87+
}
88+
89+
s.lastRefillTime = time.Now()
90+
s.numTokens = s.bucketSize
91+
}
92+
s.numTokens--
93+
return nil
94+
}

0 commit comments

Comments
 (0)