Skip to content

Commit ac2e06e

Browse files
authored
Merge pull request #154 from Flagsmith/feat/realtime-poll
feat: Realtime improvements
2 parents 500950d + 077e0ef commit ac2e06e

File tree

7 files changed

+289
-49
lines changed

7 files changed

+289
-49
lines changed

backoff.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package flagsmith
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
const (
9+
initialBackoff = 200 * time.Millisecond
10+
maxBackoff = 30 * time.Second
11+
)
12+
13+
// backoff handles exponential backoff with jitter.
14+
type backoff struct {
15+
current time.Duration
16+
}
17+
18+
// newBackoff creates a new backoff instance.
19+
func newBackoff() *backoff {
20+
return &backoff{
21+
current: initialBackoff,
22+
}
23+
}
24+
25+
// next returns the next backoff duration and updates the current backoff.
26+
func (b *backoff) next() time.Duration {
27+
// Add jitter between 0-1s
28+
backoff := b.current + time.Duration(time.Now().UnixNano()%1e9)
29+
30+
// Double the backoff time, but cap it
31+
if b.current < maxBackoff {
32+
b.current *= 2
33+
}
34+
35+
return backoff
36+
}
37+
38+
// reset resets the backoff to initial value.
39+
func (b *backoff) reset() {
40+
b.current = initialBackoff
41+
}
42+
43+
// wait waits for the current backoff time, or until ctx is done.
44+
func (b *backoff) wait(ctx context.Context) {
45+
select {
46+
case <-ctx.Done():
47+
return
48+
case <-time.After(b.next()):
49+
}
50+
}

backoff_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package flagsmith
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestBackoff(t *testing.T) {
10+
// Given
11+
b := newBackoff()
12+
13+
// When
14+
first := b.next()
15+
second := b.next()
16+
third := b.next()
17+
18+
// Then
19+
assert.LessOrEqual(t, third, maxBackoff, "Backoff should not exceed max")
20+
21+
// Backoff increases across attempts
22+
assert.Greater(t, second, first, "Second backoff should be greater than the first")
23+
assert.Greater(t, third, second, "Third backoff should be greater than the second")
24+
}
25+
26+
func TestBackoffReset(t *testing.T) {
27+
b := newBackoff()
28+
assert.Greater(t, b.next(), initialBackoff)
29+
b.reset()
30+
assert.Equal(t, initialBackoff, b.current, "Reset should return to initial backoff")
31+
}

client.go

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Client struct {
3030
identitiesWithOverrides atomic.Value
3131

3232
analyticsProcessor *AnalyticsProcessor
33+
realtime *realtime
3334
defaultFlagHandler func(string) (Flag, error)
3435

3536
client *resty.Client
@@ -76,7 +77,7 @@ func NewClient(apiKey string, options ...Option) *Client {
7677
OnBeforeRequest(newRestyLogRequestMiddleware(c.log)).
7778
OnAfterResponse(newRestyLogResponseMiddleware(c.log))
7879

79-
c.log.Debug("initialising Flagsmith client",
80+
c.log.Info("initialising Flagsmith client",
8081
"base_url", c.config.baseURL,
8182
"local_evaluation", c.config.localEvaluation,
8283
"offline", c.config.offlineMode,
@@ -104,10 +105,13 @@ func NewClient(apiKey string, options ...Option) *Client {
104105
if !strings.HasPrefix(apiKey, "ser.") {
105106
panic("In order to use local evaluation, please generate a server key in the environment settings page.")
106107
}
108+
if c.config.polling || !c.config.useRealtime {
109+
// Poll indefinitely
110+
go c.pollEnvironment(c.ctxLocalEval, true)
111+
}
107112
if c.config.useRealtime {
108-
go c.startRealtimeUpdates(c.ctxLocalEval)
109-
} else {
110-
go c.pollEnvironment(c.ctxLocalEval)
113+
// Poll until we get the environment once
114+
go c.pollThenStartRealtime(c.ctxLocalEval)
111115
}
112116
}
113117
// Initialise analytics processor
@@ -336,26 +340,76 @@ func (c *Client) getEnvironmentFlagsFromEnvironment() (Flags, error) {
336340
), nil
337341
}
338342

339-
func (c *Client) pollEnvironment(ctx context.Context) {
343+
func (c *Client) pollEnvironment(ctx context.Context, pollForever bool) {
344+
log := c.log.With(slog.String("worker", "poll"))
340345
update := func() {
341-
ctx, cancel := context.WithTimeout(ctx, c.config.envRefreshInterval)
346+
log.Debug("polling environment")
347+
ctx, cancel := context.WithTimeout(ctx, c.config.timeout)
342348
defer cancel()
343349
err := c.UpdateEnvironment(ctx)
344350
if err != nil {
345-
c.log.Error("failed to update environment", "error", err)
351+
log.Error("failed to update environment", "error", err)
346352
}
347353
}
348354
update()
349355
ticker := time.NewTicker(c.config.envRefreshInterval)
356+
defer func() {
357+
ticker.Stop()
358+
log.Info("polling stopped")
359+
}()
350360
for {
351361
select {
352362
case <-ticker.C:
363+
if !pollForever {
364+
// Check if environment was successfully fetched
365+
if _, ok := c.environment.Load().(*environments.EnvironmentModel); ok {
366+
if !pollForever {
367+
c.log.Debug("environment initialised")
368+
return
369+
}
370+
}
371+
}
353372
update()
354373
case <-ctx.Done():
355374
return
356375
}
357376
}
358377
}
378+
379+
func (c *Client) pollThenStartRealtime(ctx context.Context) {
380+
b := newBackoff()
381+
update := func() {
382+
c.log.Debug("polling environment")
383+
ctx, cancel := context.WithTimeout(ctx, c.config.envRefreshInterval)
384+
defer cancel()
385+
err := c.UpdateEnvironment(ctx)
386+
if err != nil {
387+
c.log.Error("failed to update environment", "error", err)
388+
b.wait(ctx)
389+
}
390+
}
391+
update()
392+
defer func() {
393+
c.log.Info("initial polling stopped")
394+
}()
395+
for {
396+
select {
397+
case <-ctx.Done():
398+
return
399+
default:
400+
// If environment was fetched, start realtime and finish
401+
if env, ok := c.environment.Load().(*environments.EnvironmentModel); ok {
402+
streamURL := c.config.realtimeBaseUrl + "sse/environments/" + env.APIKey + "/stream"
403+
c.log.Debug("environment initialised, starting realtime updates")
404+
c.realtime = newRealtime(c, ctx, streamURL, env.UpdatedAt)
405+
go c.realtime.start()
406+
return
407+
}
408+
update()
409+
}
410+
}
411+
}
412+
359413
func (c *Client) UpdateEnvironment(ctx context.Context) error {
360414
var env environments.EnvironmentModel
361415
resp, err := c.client.NewRequest().
@@ -380,14 +434,22 @@ func (c *Client) UpdateEnvironment(ctx context.Context) error {
380434
}
381435
return f
382436
}
437+
isNew := false
438+
previousEnv := c.environment.Load()
439+
if previousEnv == nil || env.UpdatedAt.After(previousEnv.(*environments.EnvironmentModel).UpdatedAt) {
440+
isNew = true
441+
}
383442
c.environment.Store(&env)
384443
identitiesWithOverrides := make(map[string]identities.IdentityModel)
385444
for _, id := range env.IdentityOverrides {
386445
identitiesWithOverrides[id.Identifier] = *id
387446
}
388447
c.identitiesWithOverrides.Store(identitiesWithOverrides)
389448

390-
c.log.Info("environment updated", "environment", env.APIKey)
449+
if isNew {
450+
c.log.Info("environment updated", "environment", env.APIKey, "updated_at", env.UpdatedAt)
451+
}
452+
391453
return nil
392454
}
393455

client_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -977,3 +977,45 @@ func TestWithSlogLogger(t *testing.T) {
977977
t.Log(logStr)
978978
assert.Contains(t, logStr, "initialising Flagsmith client")
979979
}
980+
981+
func TestWithPollingWorksWithRealtime(t *testing.T) {
982+
ctx := context.Background()
983+
server := httptest.NewServer(http.HandlerFunc(fixtures.EnvironmentDocumentHandler))
984+
defer server.Close()
985+
986+
// guard against data race from goroutines logging at the same time
987+
var logOutput strings.Builder
988+
var logMu sync.Mutex
989+
slogLogger := slog.New(slog.NewTextHandler(writerFunc(func(p []byte) (n int, err error) {
990+
logMu.Lock()
991+
defer logMu.Unlock()
992+
return logOutput.Write(p)
993+
}), &slog.HandlerOptions{
994+
Level: slog.LevelDebug,
995+
}))
996+
997+
// Given
998+
_ = flagsmith.NewClient(fixtures.EnvironmentAPIKey,
999+
flagsmith.WithSlogLogger(slogLogger),
1000+
flagsmith.WithLocalEvaluation(ctx),
1001+
flagsmith.WithRealtime(),
1002+
flagsmith.WithPolling(),
1003+
flagsmith.WithBaseURL(server.URL+"/api/v1/"))
1004+
1005+
// When
1006+
time.Sleep(500 * time.Millisecond)
1007+
1008+
// Then
1009+
logMu.Lock()
1010+
logStr := logOutput.String()
1011+
logMu.Unlock()
1012+
assert.Contains(t, logStr, "worker=poll")
1013+
assert.Contains(t, logStr, "worker=realtime")
1014+
}
1015+
1016+
// writerFunc implements io.Writer.
1017+
type writerFunc func(p []byte) (n int, err error)
1018+
1019+
func (f writerFunc) Write(p []byte) (n int, err error) {
1020+
return f(p)
1021+
}

config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type config struct {
2626
offlineMode bool
2727
realtimeBaseUrl string
2828
useRealtime bool
29+
polling bool
2930
}
3031

3132
// defaultConfig returns default configuration.

options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ var _ = []Option{
2222
WithCustomHeaders(nil),
2323
WithDefaultHandler(nil),
2424
WithProxy(""),
25+
WithPolling(),
2526
WithRealtime(),
2627
WithRealtimeBaseURL(""),
2728
WithLogger(nil),
@@ -157,3 +158,10 @@ func WithRealtimeBaseURL(url string) Option {
157158
c.config.realtimeBaseUrl = url
158159
}
159160
}
161+
162+
// WithPolling makes it so that the client will poll for updates even when WithRealtime is used.
163+
func WithPolling() Option {
164+
return func(c *Client) {
165+
c.config.polling = true
166+
}
167+
}

0 commit comments

Comments
 (0)