Skip to content

Draft rate limiter with example usage #4121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions pkg/common/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package common
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to be this generic? Can we give this its own package? (serious question, I'm not very good at Go)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think giving it its own package is probably a good idea--in particular we can then add useful RateLimits.


import (
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/trufflesecurity/trufflehog/v3/pkg/context"
)

// RateLimit represents a rate limiting implementation. A rate limiter
// comprises 0 or more limits. Policies should be goroutine safe.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a dumb guy, I benefit from extremely clear vocabulary. This comment has four different domain nouns:

  • "rate limiting implementation" [aka RateLimit]
  • "rate limiter"
  • "limit"
  • "policy"

But I can't tell how they relate to each other :( Is a RateLimit a limit? Is a limit a policy? If so, why are multiple terms being used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no this is perfect (also I got rid of "policy" and that was an imperfect editing step). Let me tighten it up a bit 👍🏻

//
// Importantly, limits can assume they're only ever used on a single API, and
// thus can be used in more than one rate limiter. For example, if an API has 2
// endpoints, one accepts 5r/s and another accepts 1r/s, but both have a limit
// of total 500r/month, the policy implementing the 500r/month limit should be
// able to be used in both of the 2 rate limiters for the 5r/s and 1r/s limits.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also too dumb for this comment. What is the difference between an endpoint "accepting" 5r/s and having a "limit" of 500r/mo? And why does a limit being used for a single API mean that it can be used in more than one rate limiter? (Why would a limit used for multiple APIs not be usable in more than one rate limiter?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha ok let me 🔨 on it a little

type RateLimit interface {
// Execute and update execute and update a policy, respectively. These
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do "execute" and "update" mean in the domain? I'm inferring that "execute" means "perform an API operation in a way that respects all configured rate limits, accounting for any known relevant state" and "update" means "update relevant rate limit state," but I had to think about it, and I don't know if I'm correct.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might clarify things to write a separate doc comment for each. (Maybe not, though.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea yeah; gave this a shot 👍🏻

// should:
// - Be goroutine safe
// - Check if ctx has been canceled
// - Not modify req/res
// If they return an error, it's combined with errors from the
// execution/updating of the other limits. Other limits will still be
// executed/updated.
//
// If waiting/sleeping is required, Execute should do it. Keep in mind,
// however, that each policy's Execute method is called serially, so Execute
// should *NEVER* sleep for a duration--it should only sleep until a time.
// This also means that if an API only returns durations, Update must
// immediately convert them into times, and it's recommended to pad these
// somewhat.
Execute(ctx context.Context, req *http.Request, now time.Time) error
Update(ctx context.Context, res *http.Response) error
}

// RateLimiter provides a facility for rate limiting HTTP requests. To use it:
// - Create a RateLimiter with its limits
// - Call .Do instead of what you would normally call to make a request
// - Process the response (returned from .Do) as normal
type RateLimiter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to suggest this be ClientSideRateLimiter just to be very clear what this is intended to do, but it's not a hill to die on. I struggled without context to get out of the server-side mindset when reviewing this, and I imagine future devs might make the same mistake when they reach for a server side rate limiting library and see "rate limiting".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some side conversation happened on this topic in slack. I leave it up to your excellent judgement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(cc: @rosecodym) I started to %s/RateLimit/SourceIntegrationRateLimit/g but then I thought what if instead of common we put this in sources?

limits []RateLimit
}

// Returns a new rate limiter with the given limits.
func NewRateLimiter(limits ...RateLimit) *RateLimiter {
return &RateLimiter{limits: limits}
}

// Makes an HTTP request subject to the rate limiter's limits.
func (rl *RateLimiter) Do(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to make sure that we don't repeatedly rate limit in a cycle forever? Or is that something that we have to force onto clients of this library?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the idea is the call to ctx.Err checks if the context is canceled (i.e. hit a timeout or a deadline) before we execute a RateLimit.

I also ran down how Go's HTTP client handles contexts and basically you build it into the request if you want it. This isn't ideal; I didn't do a survey but I'd be (very) surprised if our Sources did this consistently. I'm pretty sure there's a default timeout somewhere, but dunno what it is (almost certainly very long). Maybe tossing in another ctx.Err check before the makeRequest() call mitigates that? Feels prudent to add.

Or is that something that we have to force onto clients of this library?

One thing we could do is have RateLimiter enforce its own timeouts, but I think that's a little too risky or unexpected. Re: your other comment, hopefully metrics/alerts cover our needs here (e.g. we can see a limit/source/customer is acting up and do something about it)

ctx context.Context,
req *http.Request,
makeRequest func() (*http.Response, error),
) (*http.Response, error) {
if len(rl.limits) == 0 {
return makeRequest()
}

now := time.Now()

for i, lim := range rl.limits {
if err := ctx.Err(); err != nil {
return nil, err
}

// [NOTE] It's maybe better to do this asynchronously, in case an errant
// limit sleeps for a duration instead of until a specific time, but
// I haven't thought through that.
if err := lim.Execute(ctx, req, now); err != nil {
return nil, fmt.Errorf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how I'd mechanize this, but I think I'd really want some non-fatal observability. What's getting rate limited? How often? How much time are we spending sleeping? Plugging this into whatever it is we use for metrics and then setting some sanity check alerting would be really nice.

I'm worried that things will start to get rate limited, there won't be anything visible, and things will just slow down or completely block without us knowing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh definitely 💯 . One of the things I'm hoping this gives us is unified metrics, etc. on rate limiting. I think we wouldn't merge something like this w/o that, at least I wouldn't want to.


Something I was thinking about is how useful it is for limits to have names. Like, I had that thought and I was waiting for validation that it was correct, and I think this is it. The options I can think of for this are:

  • instead of accepting (effectively) an array of limits in NewRateLimiter, use a map[string]RateLimit parameter instead
  • require RateLimit to have a .Name method

If it's possible to have the same RateLimit in multiple RateLimiters, I don't really know what's useful here. It'd be annoying if two RateLimits had the same name (.Name would allow this unless we explicitly check for it), but maybe it's also annoying for the same RateLimit to have different names in different RateLimiters.

OK thinking through it, I'm leaning towards map[string]RateLimit:

  • it's easy to trace what name is assigned to a RateLimit in calls to NewRateLimiter
  • naming uniqueness is strongly signaled and subsequently enforced
  • names may be better, i.e. you're more likely to get "max 500r/month" as a name than "TokenBucketRateLimiter"

Yeah actually that last point pushes me over the edge, OK

"error executing rate limit policy %d: %w",
i,
err,
)
}
}

res, err := makeRequest()
if err != nil {
return nil, fmt.Errorf("error making HTTP request: %w", err)
}

// [NOTE] errgroup.Group oddly isn't what we want here. It presumes you want
// to stop all other processing if a single task fails (we don't), and
// that functionality is the only reason to use it instead of a
// WaitGroup.
wg := &sync.WaitGroup{}
updateErrorLock := &sync.Mutex{}
var updateError error = nil

for i, lim := range rl.limits {
wg.Add(1)
go func(i int, lim RateLimit) {
defer wg.Done()

if err := lim.Update(ctx, res); err != nil {
err = fmt.Errorf("error updating rate limit policy %d: %w", i, err)

updateErrorLock.Lock()
if updateError == nil {
updateError = err
} else {
updateError = errors.Join(updateError, err)
}
updateErrorLock.Unlock()
}
}(i, lim)
}

wg.Wait()

if updateError != nil {
return nil, fmt.Errorf("error updating rate limits: %w", updateError)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kinds of errors might we see here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting question... hmmm, off the top of my head:

  • unexpected HTTP status
  • looked for a header/headers but didn't get one/them
  • found a header(s) but value was bad
    • this could be anything from "'90 seconds' is not an integer" to "the API asked me to wait six years... I'm not doing that"
  • updating the state (e.g. if we're persisting it somewhere) failed
  • context timed out

}

return res, nil
}
99 changes: 75 additions & 24 deletions pkg/sources/postman/postman_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"time"

"github.com/trufflesecurity/trufflehog/v3/pkg/common"
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -183,6 +184,42 @@ type Response struct {
UID string `json:"uid,omitempty"`
}

// TokenBucketRateLimit implements a basic "requests per second with
// bursting" rate limiter.
type TokenBucketRateLimit struct {
limiter *rate.Limiter
}

// Creates a new TokenBucketRateLimit
// lim: a Limit representing the max number of requests per second
// burst: max number of requests that can be sent if any requests can be sent
//
// This is a (very) thin wrapper around Google's rate limiter.
func NewTokenBucketRateLimit(lim rate.Limit, burst int) *TokenBucketRateLimit {
return &TokenBucketRateLimit{
limiter: rate.NewLimiter(rate.Limit(lim), burst),
}
}

func (tp *TokenBucketRateLimit) Execute(
ctx context.Context,
req *http.Request,
now time.Time,
) error {
if err := ctx.Err(); err != nil {
return err
}

return tp.limiter.Wait(ctx)
}

func (tp *TokenBucketRateLimit) Update(
ctx context.Context,
res *http.Response,
) error {
return nil
}

// A Client manages communication with the Postman API.
type Client struct {
// HTTP client used to communicate with the API
Expand All @@ -193,10 +230,10 @@ type Client struct {

// Rate limiter needed for Postman API workspace and collection requests. Postman API rate limit
// is 10 calls in 10 seconds for GET /collections, GET /workspaces, and GET /workspaces/{id} endpoints.
WorkspaceAndCollectionRateLimiter *rate.Limiter
WorkspaceAndCollectionRateLimiter *common.RateLimiter

// Rate limiter needed for Postman API. General rate limit is 300 requests per minute.
GeneralRateLimiter *rate.Limiter
GeneralRateLimiter *common.RateLimiter
}

// NewClient returns a new Postman API client.
Expand All @@ -208,10 +245,14 @@ func NewClient(postmanToken string) *Client {
}

c := &Client{
HTTPClient: http.DefaultClient,
Headers: bh,
WorkspaceAndCollectionRateLimiter: rate.NewLimiter(rate.Every(time.Second), 1),
GeneralRateLimiter: rate.NewLimiter(rate.Every(time.Second/5), 1),
HTTPClient: http.DefaultClient,
Headers: bh,
WorkspaceAndCollectionRateLimiter: common.NewRateLimiter(
NewTokenBucketRateLimit(rate.Every(time.Second), 1),
),
GeneralRateLimiter: common.NewRateLimiter(
NewTokenBucketRateLimit(rate.Every(time.Second/5), 1),
),
}

return c
Expand Down Expand Up @@ -247,13 +288,15 @@ func checkResponseStatus(r *http.Response) error {
}

// getPostmanResponseBodyBytes makes a request to the Postman API and returns the response body as bytes.
func (c *Client) getPostmanResponseBodyBytes(ctx context.Context, url string, headers map[string]string) ([]byte, error) {
func (c *Client) getPostmanResponseBodyBytes(ctx context.Context, url string, headers map[string]string, rl *common.RateLimiter) ([]byte, error) {
req, err := c.NewRequest(url, headers)
if err != nil {
return nil, err
}

resp, err := c.HTTPClient.Do(req)
resp, err := rl.Do(ctx, req, func() (*http.Response, error) {
return c.HTTPClient.Do(req)
})
if err != nil {
return nil, err
}
Expand All @@ -280,10 +323,12 @@ func (c *Client) EnumerateWorkspaces(ctx context.Context) ([]Workspace, error) {
Workspaces []Workspace `json:"workspaces"`
}{}

if err := c.WorkspaceAndCollectionRateLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("could not wait for rate limiter during workspaces enumeration getting: %w", err)
}
body, err := c.getPostmanResponseBodyBytes(ctx, "https://api.getpostman.com/workspaces", nil)
body, err := c.getPostmanResponseBodyBytes(
ctx,
"https://api.getpostman.com/workspaces",
nil,
c.WorkspaceAndCollectionRateLimiter,
)
if err != nil {
return nil, fmt.Errorf("could not get postman workspace response bytes during enumeration: %w", err)
}
Expand Down Expand Up @@ -315,10 +360,12 @@ func (c *Client) GetWorkspace(ctx context.Context, workspaceUUID string) (Worksp
}{}

url := fmt.Sprintf(WORKSPACE_URL, workspaceUUID)
if err := c.WorkspaceAndCollectionRateLimiter.Wait(ctx); err != nil {
return Workspace{}, fmt.Errorf("could not wait for rate limiter during workspace getting: %w", err)
}
body, err := c.getPostmanResponseBodyBytes(ctx, url, nil)
body, err := c.getPostmanResponseBodyBytes(
ctx,
url,
nil,
c.WorkspaceAndCollectionRateLimiter,
)
if err != nil {
return Workspace{}, fmt.Errorf("could not get postman workspace (%s) response bytes: %w", workspaceUUID, err)
}
Expand All @@ -336,10 +383,12 @@ func (c *Client) GetEnvironmentVariables(ctx context.Context, environment_uuid s
}{}

url := fmt.Sprintf(ENVIRONMENTS_URL, environment_uuid)
if err := c.GeneralRateLimiter.Wait(ctx); err != nil {
return VariableData{}, fmt.Errorf("could not wait for rate limiter during environment variable getting: %w", err)
}
body, err := c.getPostmanResponseBodyBytes(ctx, url, nil)
body, err := c.getPostmanResponseBodyBytes(
ctx,
url,
nil,
c.GeneralRateLimiter,
)
if err != nil {
return VariableData{}, fmt.Errorf("could not get postman environment (%s) response bytes: %w", environment_uuid, err)
}
Expand All @@ -357,10 +406,12 @@ func (c *Client) GetCollection(ctx context.Context, collection_uuid string) (Col
}{}

url := fmt.Sprintf(COLLECTIONS_URL, collection_uuid)
if err := c.WorkspaceAndCollectionRateLimiter.Wait(ctx); err != nil {
return Collection{}, fmt.Errorf("could not wait for rate limiter during collection getting: %w", err)
}
body, err := c.getPostmanResponseBodyBytes(ctx, url, nil)
body, err := c.getPostmanResponseBodyBytes(
ctx,
url,
nil,
c.WorkspaceAndCollectionRateLimiter,
)
if err != nil {
return Collection{}, fmt.Errorf("could not get postman collection (%s) response bytes: %w", collection_uuid, err)
}
Expand Down
Loading