Skip to content

Commit b058543

Browse files
authored
Merge pull request #7359 from onflow/peter/7337-jsoncdc-encode-websockets-v0.40
[Access] Update websockets events and account status reponses to use json-cdc
2 parents 6635a21 + e57f23b commit b058543

File tree

4 files changed

+143
-10
lines changed

4 files changed

+143
-10
lines changed

engine/access/rest/websockets/data_providers/account_statuses_provider.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,25 @@ func NewAccountStatusesDataProvider(
8383
func (p *AccountStatusesDataProvider) Run() error {
8484
return run(
8585
p.createAndStartSubscription(p.ctx, p.arguments),
86-
func(response *backend.AccountStatusesResponse) error {
87-
return p.sendResponse(response)
88-
},
86+
p.handleResponse,
8987
)
9088
}
9189

90+
// handleResponse processes the response from the subscription and sends it to the client's channel.
91+
// As part of the processing, it converts the event payloads from CCF to JSON-CDC format.
92+
// This function is not expected to be called concurrently.
93+
//
94+
// No errors expected during normal operations.
95+
func (p *AccountStatusesDataProvider) handleResponse(response *backend.AccountStatusesResponse) error {
96+
// convert events to JSON-CDC format
97+
convertedResponse, err := convertAccountStatusesResponse(response)
98+
if err != nil {
99+
return fmt.Errorf("failed to convert account status events to JSON-CDC format: %w", err)
100+
}
101+
102+
return p.sendResponse(convertedResponse)
103+
}
104+
92105
// sendResponse processes an account statuses message and sends it to data provider's channel.
93106
// This function is not safe to call concurrently.
94107
//
@@ -133,6 +146,27 @@ func (p *AccountStatusesDataProvider) createAndStartSubscription(
133146
return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, args.Filter)
134147
}
135148

149+
// convertAccountStatusesResponse converts events in the provided AccountStatusesResponse from CCF
150+
// to JSON-CDC format.
151+
//
152+
// No errors expected during normal operations.
153+
func convertAccountStatusesResponse(resp *backend.AccountStatusesResponse) (*backend.AccountStatusesResponse, error) {
154+
jsoncdcEvents := make(map[string]flow.EventsList, len(resp.AccountEvents))
155+
for eventType, events := range resp.AccountEvents {
156+
convertedEvents, err := convertEvents(events)
157+
if err != nil {
158+
return nil, fmt.Errorf("failed to convert %s events to JSON-CDC: %w", eventType, err)
159+
}
160+
jsoncdcEvents[eventType] = convertedEvents
161+
}
162+
163+
return &backend.AccountStatusesResponse{
164+
BlockID: resp.BlockID,
165+
Height: resp.Height,
166+
AccountEvents: jsoncdcEvents,
167+
}, nil
168+
}
169+
136170
// parseAccountStatusesArguments validates and initializes the account statuses arguments.
137171
func parseAccountStatusesArguments(
138172
arguments wsmodels.Arguments,

engine/access/rest/websockets/data_providers/account_statuses_provider_test.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/onflow/flow/protobuf/go/flow/entities"
1011
"github.com/rs/zerolog"
1112
"github.com/stretchr/testify/mock"
1213
"github.com/stretchr/testify/require"
@@ -18,8 +19,10 @@ import (
1819
"github.com/onflow/flow-go/engine/access/state_stream/backend"
1920
ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock"
2021
"github.com/onflow/flow-go/engine/access/subscription"
22+
"github.com/onflow/flow-go/engine/common/rpc/convert"
2123
"github.com/onflow/flow-go/model/flow"
2224
"github.com/onflow/flow-go/utils/unittest"
25+
"github.com/onflow/flow-go/utils/unittest/generator"
2326
)
2427

2528
// AccountStatusesProviderSuite is a test suite for testing the account statuses providers functionality.
@@ -66,11 +69,16 @@ func (s *AccountStatusesProviderSuite) SetupTest() {
6669
// validates that events are correctly streamed to the channel and ensures
6770
// no unexpected errors occur.
6871
func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_HappyPath() {
72+
eventGenerator := generator.EventGenerator(generator.WithEncoding(entities.EventEncodingVersion_CCF_V0))
6973
events := []flow.Event{
70-
unittest.EventFixture(state_stream.CoreEventAccountCreated, 0, 0, unittest.IdentifierFixture(), 0),
71-
unittest.EventFixture(state_stream.CoreEventAccountKeyAdded, 0, 0, unittest.IdentifierFixture(), 0),
74+
eventGenerator.New(),
75+
eventGenerator.New(),
7276
}
7377

78+
// use account status events
79+
events[0].Type = state_stream.CoreEventAccountCreated
80+
events[1].Type = state_stream.CoreEventAccountKeyAdded
81+
7482
backendResponses := s.backendAccountStatusesResponses(events)
7583

7684
testHappyPath(
@@ -190,7 +198,25 @@ func (s *AccountStatusesProviderSuite) expectedAccountStatusesResponses(backendR
190198
expectedResponses := make([]interface{}, len(backendResponses))
191199

192200
for i, resp := range backendResponses {
193-
expectedResponsePayload := models.NewAccountStatusesResponse(resp, uint64(i))
201+
// avoid updating the original response
202+
expected := &backend.AccountStatusesResponse{
203+
Height: resp.Height,
204+
BlockID: resp.BlockID,
205+
AccountEvents: make(map[string]flow.EventsList, len(resp.AccountEvents)),
206+
}
207+
208+
// events are provided in CCF format, but we expect all event payloads in JSON-CDC format
209+
for eventType, events := range resp.AccountEvents {
210+
convertedEvents := make([]flow.Event, len(events))
211+
for j, event := range events {
212+
converted, err := convert.CcfEventToJsonEvent(event)
213+
s.Require().NoError(err)
214+
convertedEvents[j] = *converted
215+
}
216+
expected.AccountEvents[eventType] = convertedEvents
217+
}
218+
219+
expectedResponsePayload := models.NewAccountStatusesResponse(expected, uint64(i))
194220
expectedResponses[i] = &models.BaseDataProvidersResponse{
195221
Topic: AccountStatusesTopic,
196222
Payload: expectedResponsePayload,

engine/access/rest/websockets/data_providers/events_provider.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/onflow/flow-go/engine/access/state_stream"
1313
"github.com/onflow/flow-go/engine/access/state_stream/backend"
1414
"github.com/onflow/flow-go/engine/access/subscription"
15+
"github.com/onflow/flow-go/engine/common/rpc/convert"
1516
"github.com/onflow/flow-go/model/flow"
1617
"github.com/onflow/flow-go/module/counters"
1718
)
@@ -84,10 +85,25 @@ func NewEventsDataProvider(
8485
func (p *EventsDataProvider) Run() error {
8586
return run(
8687
p.createAndStartSubscription(p.ctx, p.arguments),
87-
p.sendResponse,
88+
p.handleResponse,
8889
)
8990
}
9091

92+
// handleResponse processes the response from the subscription and sends it to the client's channel.
93+
// As part of the processing, it converts the event payloads from CCF to JSON-CDC format.
94+
// This function is not expected to be called concurrently.
95+
//
96+
// No errors expected during normal operations.
97+
func (p *EventsDataProvider) handleResponse(response *backend.EventsResponse) error {
98+
// convert events to JSON-CDC format
99+
convertedResponse, err := convertEventsResponse(response)
100+
if err != nil {
101+
return fmt.Errorf("failed to convert events to JSON-CDC format: %w", err)
102+
}
103+
104+
return p.sendResponse(convertedResponse)
105+
}
106+
91107
// sendResponse processes an event message and sends it to client's channel.
92108
// This function is not expected to be called concurrently.
93109
//
@@ -129,6 +145,43 @@ func (p *EventsDataProvider) createAndStartSubscription(ctx context.Context, arg
129145
return p.stateStreamApi.SubscribeEventsFromLatest(ctx, args.Filter)
130146
}
131147

148+
// convertEventsResponse converts events in the provided EventsResponse from CCF to JSON-CDC format.
149+
//
150+
// No errors expected during normal operations.
151+
func convertEventsResponse(resp *backend.EventsResponse) (*backend.EventsResponse, error) {
152+
jsoncdcEvents, err := convertEvents(resp.Events)
153+
if err != nil {
154+
return nil, fmt.Errorf("failed to convert events to JSON-CDC: %w", err)
155+
}
156+
157+
return &backend.EventsResponse{
158+
BlockID: resp.BlockID,
159+
Height: resp.Height,
160+
BlockTimestamp: resp.BlockTimestamp,
161+
Events: jsoncdcEvents,
162+
}, nil
163+
}
164+
165+
// convertEvents converts a slice events with CCF encoded payloads into a slice of new events who's
166+
// payloads are encoded in JSON-CDC format.
167+
//
168+
// Note: this function creates a copy of the original events before converting the payload. This
169+
// is important to ensure the original data structure is not modified, which could impact data held
170+
// in caches.
171+
//
172+
// No errors expected during normal operations.
173+
func convertEvents(ccfEvents []flow.Event) ([]flow.Event, error) {
174+
jsoncdcEvents := make([]flow.Event, len(ccfEvents))
175+
for i, ccfEvent := range ccfEvents {
176+
converted, err := convert.CcfEventToJsonEvent(ccfEvent)
177+
if err != nil {
178+
return nil, fmt.Errorf("failed to convert event %d: %w", i, err)
179+
}
180+
jsoncdcEvents[i] = *converted
181+
}
182+
return jsoncdcEvents, nil
183+
}
184+
132185
// parseEventsArguments validates and initializes the events arguments.
133186
func parseEventsArguments(
134187
arguments wsmodels.Arguments,

engine/access/rest/websockets/data_providers/events_provider_test.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/onflow/flow/protobuf/go/flow/entities"
1011
"github.com/rs/zerolog"
1112
"github.com/stretchr/testify/mock"
1213
"github.com/stretchr/testify/suite"
@@ -17,8 +18,10 @@ import (
1718
"github.com/onflow/flow-go/engine/access/state_stream/backend"
1819
ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock"
1920
"github.com/onflow/flow-go/engine/access/subscription"
21+
"github.com/onflow/flow-go/engine/common/rpc/convert"
2022
"github.com/onflow/flow-go/model/flow"
2123
"github.com/onflow/flow-go/utils/unittest"
24+
"github.com/onflow/flow-go/utils/unittest/generator"
2225
)
2326

2427
// EventsProviderSuite is a test suite for testing the events providers functionality.
@@ -65,9 +68,10 @@ func (s *EventsProviderSuite) SetupTest() {
6568
// validates that events are correctly streamed to the channel and ensures
6669
// no unexpected errors occur.
6770
func (s *EventsProviderSuite) TestEventsDataProvider_HappyPath() {
71+
eventGenerator := generator.EventGenerator(generator.WithEncoding(entities.EventEncodingVersion_CCF_V0))
6872
events := []flow.Event{
69-
unittest.EventFixture(flow.EventAccountCreated, 0, 0, unittest.IdentifierFixture(), 0),
70-
unittest.EventFixture(flow.EventAccountUpdated, 0, 0, unittest.IdentifierFixture(), 0),
73+
eventGenerator.New(),
74+
eventGenerator.New(),
7175
}
7276

7377
backendResponses := s.backendEventsResponses(events)
@@ -182,7 +186,23 @@ func (s *EventsProviderSuite) expectedEventsResponses(
182186
expectedResponses := make([]interface{}, len(backendResponses))
183187

184188
for i, resp := range backendResponses {
185-
expectedResponsePayload := models.NewEventResponse(resp, uint64(i))
189+
// avoid updating the original response
190+
expected := &backend.EventsResponse{
191+
Height: resp.Height,
192+
BlockID: resp.BlockID,
193+
BlockTimestamp: resp.BlockTimestamp,
194+
Events: make(flow.EventsList, len(resp.Events)),
195+
}
196+
197+
// events are provided in CCF format, but we expect all event payloads in JSON-CDC format
198+
for i, event := range resp.Events {
199+
converted, err := convert.CcfEventToJsonEvent(event)
200+
s.Require().NoError(err)
201+
202+
expected.Events[i] = *converted
203+
}
204+
205+
expectedResponsePayload := models.NewEventResponse(expected, uint64(i))
186206
expectedResponses[i] = &models.BaseDataProvidersResponse{
187207
Topic: EventsTopic,
188208
Payload: expectedResponsePayload,

0 commit comments

Comments
 (0)