Skip to content

Commit 3825ef5

Browse files
authored
feat(realtime): Add realtime support (#146)
* feat(realtime): Add realtime support
1 parent dacb607 commit 3825ef5

File tree

7 files changed

+217
-5
lines changed

7 files changed

+217
-5
lines changed

client.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,16 @@ func NewClient(apiKey string, options ...Option) *Client {
8989
if !strings.HasPrefix(apiKey, "ser.") {
9090
panic("In order to use local evaluation, please generate a server key in the environment settings page.")
9191
}
92-
93-
go c.pollEnvironment(c.ctxLocalEval)
92+
if c.config.useRealtime {
93+
go c.startRealtimeUpdates(c.ctxLocalEval)
94+
} else {
95+
go c.pollEnvironment(c.ctxLocalEval)
96+
}
9497
}
9598
// Initialize analytics processor
9699
if c.config.enableAnalytics {
97100
c.analyticsProcessor = NewAnalyticsProcessor(c.ctxAnalytics, c.client, c.config.baseURL, nil, c.log)
98101
}
99-
100102
return c
101103
}
102104

@@ -331,7 +333,6 @@ func (c *Client) pollEnvironment(ctx context.Context) {
331333
}
332334
}
333335
}
334-
335336
func (c *Client) UpdateEnvironment(ctx context.Context) error {
336337
var env environments.EnvironmentModel
337338
resp, err := c.client.NewRequest().

client_test.go

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"sync"
1111
"testing"
1212
"time"
13-
1413
flagsmith "github.com/Flagsmith/flagsmith-go-client/v4"
1514
"github.com/Flagsmith/flagsmith-go-client/v4/fixtures"
1615
"github.com/stretchr/testify/assert"
@@ -200,6 +199,7 @@ func TestGetFlags(t *testing.T) {
200199
assert.Equal(t, fixtures.Feature1Name, allFlags[0].FeatureName)
201200
assert.Equal(t, fixtures.Feature1ID, allFlags[0].FeatureID)
202201
assert.Equal(t, fixtures.Feature1Value, allFlags[0].Value)
202+
203203
}
204204

205205
func TestGetFlagsTransientIdentity(t *testing.T) {
@@ -861,3 +861,101 @@ func TestPollErrorHandlerIsUsedWhenPollFails(t *testing.T) {
861861
assert.Equal(t, statusCode, 500)
862862
assert.Equal(t, status, "500 Internal Server Error")
863863
}
864+
865+
func TestRealtime(t *testing.T) {
866+
// Given
867+
mux := http.NewServeMux()
868+
requestCount := struct {
869+
mu sync.Mutex
870+
count int
871+
}{}
872+
873+
mux.HandleFunc("/api/v1/environment-document/", func(rw http.ResponseWriter, req *http.Request) {
874+
assert.Equal(t, "GET", req.Method)
875+
assert.Equal(t, fixtures.EnvironmentAPIKey, req.Header.Get("X-Environment-Key"))
876+
requestCount.mu.Lock()
877+
requestCount.count++
878+
requestCount.mu.Unlock()
879+
880+
rw.Header().Set("Content-Type", "application/json")
881+
rw.WriteHeader(http.StatusOK)
882+
_, err := io.WriteString(rw, fixtures.EnvironmentJson)
883+
if err != nil {
884+
panic(err)
885+
}
886+
assert.NoError(t, err)
887+
})
888+
mux.HandleFunc(fmt.Sprintf("/sse/environments/%s/stream", fixtures.ClientAPIKey), func(rw http.ResponseWriter, req *http.Request) {
889+
assert.Equal(t, "GET", req.Method)
890+
891+
// Set the necessary headers for SSE
892+
rw.Header().Set("Content-Type", "text/event-stream")
893+
rw.Header().Set("Cache-Control", "no-cache")
894+
rw.Header().Set("Connection", "keep-alive")
895+
896+
// Flush headers to the client
897+
flusher, _ := rw.(http.Flusher)
898+
flusher.Flush()
899+
900+
// Use an `updated_at` value that is older than the `updated_at` set on the environment document
901+
// to ensure an older timestamp does not trigger an update.
902+
sendUpdatedAtSSEEvent(rw, flusher, 1640995200.079725)
903+
time.Sleep(10 * time.Millisecond)
904+
905+
// Update the `updated_at`(to trigger the environment update)
906+
sendUpdatedAtSSEEvent(rw, flusher, 1733480514.079725)
907+
time.Sleep(10 * time.Millisecond)
908+
})
909+
910+
ctx := context.Background()
911+
912+
server := httptest.NewServer(mux)
913+
defer server.Close()
914+
915+
// When
916+
client := flagsmith.NewClient(fixtures.EnvironmentAPIKey,
917+
flagsmith.WithBaseURL(server.URL+"/api/v1/"),
918+
flagsmith.WithLocalEvaluation(ctx),
919+
flagsmith.WithRealtime(),
920+
flagsmith.WithRealtimeBaseURL(server.URL+"/"),
921+
)
922+
// Sleep to ensure that the server has time to update the environment
923+
time.Sleep(10 * time.Millisecond)
924+
925+
flags, err := client.GetFlags(ctx, nil)
926+
927+
// Then
928+
assert.NoError(t, err)
929+
930+
allFlags := flags.AllFlags()
931+
932+
assert.Equal(t, 1, len(allFlags))
933+
934+
assert.Equal(t, fixtures.Feature1Name, allFlags[0].FeatureName)
935+
assert.Equal(t, fixtures.Feature1ID, allFlags[0].FeatureID)
936+
assert.Equal(t, fixtures.Feature1Value, allFlags[0].Value)
937+
938+
// Sleep to ensure that the server has time to update the environment
939+
// (After the second sse event)
940+
time.Sleep(10 * time.Millisecond)
941+
942+
requestCount.mu.Lock()
943+
assert.Equal(t, 2, requestCount.count)
944+
}
945+
func sendUpdatedAtSSEEvent(rw http.ResponseWriter, flusher http.Flusher, updatedAt float64) {
946+
// Format the SSE event with the provided updatedAt value
947+
sseEvent := fmt.Sprintf(`event: environment_updated
948+
data: {"updated_at": %f}
949+
950+
`, updatedAt)
951+
952+
// Write the SSE event to the response
953+
_, err := io.WriteString(rw, sseEvent)
954+
if err != nil {
955+
http.Error(rw, "Failed to send SSE event", http.StatusInternalServerError)
956+
return
957+
}
958+
959+
// Flush the event to the client
960+
flusher.Flush()
961+
}

config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const (
1313
DefaultBaseURL = "https://edge.api.flagsmith.com/api/v1/"
1414

1515
bulkIdentifyMaxCount = 100
16+
DefaultRealtimeBaseUrl = "https://realtime.flagsmith.com/"
1617
)
1718

1819
// config contains all configurable Client settings.
@@ -23,6 +24,8 @@ type config struct {
2324
envRefreshInterval time.Duration
2425
enableAnalytics bool
2526
offlineMode bool
27+
realtimeBaseUrl string
28+
useRealtime bool
2629
}
2730

2831
// defaultConfig returns default configuration.
@@ -31,5 +34,6 @@ func defaultConfig() config {
3134
baseURL: DefaultBaseURL,
3235
timeout: DefaultTimeout,
3336
envRefreshInterval: time.Second * 60,
37+
realtimeBaseUrl: DefaultRealtimeBaseUrl,
3438
}
3539
}

fixtures/fixture.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ const Feature1Name = "feature_1"
1212
const Feature1ID = 1
1313

1414
const Feature1OverriddenValue = "some-overridden-value"
15+
const ClientAPIKey = "B62qaMZNwfiqT76p38ggrQ"
1516

1617
const EnvironmentJson = `
1718
{
1819
"api_key": "B62qaMZNwfiqT76p38ggrQ",
20+
"updated_at": "2023-12-06T10:21:54.079725Z",
1921
"project": {
2022
"name": "Test project",
2123
"organisation": {

flagengine/environments/models.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/features"
55
"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/identities"
66
"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/projects"
7+
"time"
78
)
89

910
type EnvironmentModel struct {
@@ -12,4 +13,5 @@ type EnvironmentModel struct {
1213
Project *projects.ProjectModel `json:"project"`
1314
FeatureStates []*features.FeatureStateModel `json:"feature_states"`
1415
IdentityOverrides []*identities.IdentityModel `json:"identity_overrides"`
16+
UpdatedAt time.Time `json:"updated_at"`
1517
}

options.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package flagsmith
22

33
import (
44
"context"
5+
"strings"
56
"time"
67
)
78

@@ -19,6 +20,8 @@ var _ = []Option{
1920
WithCustomHeaders(nil),
2021
WithDefaultHandler(nil),
2122
WithProxy(""),
23+
WithRealtime(),
24+
WithRealtimeBaseURL(""),
2225
}
2326

2427
func WithBaseURL(url string) Option {
@@ -124,3 +127,22 @@ func WithErrorHandler(handler func(handler *FlagsmithAPIError)) Option {
124127
c.errorHandler = handler
125128
}
126129
}
130+
131+
// WithRealtime returns an Option function that enables real-time updates for the Client.
132+
// NOTE: Before enabling real-time updates, ensure that local evaluation is enabled.
133+
func WithRealtime() Option {
134+
return func(c *Client) {
135+
c.config.useRealtime = true
136+
}
137+
}
138+
139+
// WithRealtimeBaseURL returns an Option function for configuring the real-time base URL of the Client.
140+
func WithRealtimeBaseURL(url string) Option {
141+
return func(c *Client) {
142+
// Ensure the URL ends with a trailing slash
143+
if !strings.HasSuffix(url, "/") {
144+
url += "/"
145+
}
146+
c.config.realtimeBaseUrl = url
147+
}
148+
}

realtime.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package flagsmith
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"encoding/json"
7+
"errors"
8+
"net/http"
9+
"strings"
10+
"time"
11+
12+
"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/environments"
13+
)
14+
15+
func (c *Client) startRealtimeUpdates(ctx context.Context) {
16+
err := c.UpdateEnvironment(ctx)
17+
if err != nil {
18+
panic("Failed to fetch the environment while configuring real-time updates")
19+
}
20+
env, _ := c.environment.Load().(*environments.EnvironmentModel)
21+
stream_url := c.config.realtimeBaseUrl + "sse/environments/" + env.APIKey + "/stream"
22+
envUpdatedAt := env.UpdatedAt
23+
for {
24+
select {
25+
case <-ctx.Done():
26+
return
27+
default:
28+
resp, err := http.Get(stream_url)
29+
if err != nil {
30+
c.log.Errorf("Error connecting to realtime server: %v", err)
31+
continue
32+
}
33+
defer resp.Body.Close()
34+
35+
scanner := bufio.NewScanner(resp.Body)
36+
for scanner.Scan() {
37+
line := scanner.Text()
38+
if strings.HasPrefix(line, "data: ") {
39+
parsedTime, err := parseUpdatedAtFromSSE(line)
40+
if err != nil {
41+
c.log.Errorf("Error reading realtime stream: %v", err)
42+
continue
43+
}
44+
if parsedTime.After(envUpdatedAt) {
45+
err = c.UpdateEnvironment(ctx)
46+
if err != nil {
47+
c.log.Errorf("Failed to update the environment: %v", err)
48+
continue
49+
}
50+
env, _ := c.environment.Load().(*environments.EnvironmentModel)
51+
52+
envUpdatedAt = env.UpdatedAt
53+
}
54+
}
55+
}
56+
if err := scanner.Err(); err != nil {
57+
c.log.Errorf("Error reading realtime stream: %v", err)
58+
}
59+
}
60+
}
61+
}
62+
func parseUpdatedAtFromSSE(line string) (time.Time, error) {
63+
var eventData struct {
64+
UpdatedAt float64 `json:"updated_at"`
65+
}
66+
67+
data := strings.TrimPrefix(line, "data: ")
68+
err := json.Unmarshal([]byte(data), &eventData)
69+
if err != nil {
70+
return time.Time{}, errors.New("failed to parse event data: " + err.Error())
71+
}
72+
73+
if eventData.UpdatedAt <= 0 {
74+
return time.Time{}, errors.New("invalid 'updated_at' value in event data")
75+
}
76+
77+
// Convert the float timestamp into seconds and nanoseconds
78+
seconds := int64(eventData.UpdatedAt)
79+
nanoseconds := int64((eventData.UpdatedAt - float64(seconds)) * 1e9)
80+
81+
// Return the parsed time
82+
return time.Unix(seconds, nanoseconds), nil
83+
}

0 commit comments

Comments
 (0)