Skip to content

Commit 415b97a

Browse files
committed
[OTE-823] Fix FNS onchain events staging + retrieval logic (#2318)
1 parent cb62cd6 commit 415b97a

File tree

5 files changed

+38
-15
lines changed

5 files changed

+38
-15
lines changed

protocol/app/app.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2046,6 +2046,7 @@ func getFullNodeStreamingManagerFromOptions(
20462046
appFlags.GrpcStreamingMaxChannelBufferSize,
20472047
appFlags.FullNodeStreamingSnapshotInterval,
20482048
streamingManagerTransientStoreKey,
2049+
cdc,
20492050
)
20502051

20512052
// Start websocket server.

protocol/app/flags/flags.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,6 @@ func (f *Flags) Validate() error {
163163

164164
// Grpc streaming
165165
if f.GrpcStreamingEnabled {
166-
if f.OptimisticExecutionEnabled {
167-
// TODO(OTE-456): Finish gRPC streaming x OE integration.
168-
return fmt.Errorf("grpc streaming cannot be enabled together with optimistic execution")
169-
}
170166
if !f.GrpcEnable {
171167
return fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server")
172168
}

protocol/app/flags/flags_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,17 @@ func TestValidate(t *testing.T) {
113113
OptimisticExecutionEnabled: true,
114114
},
115115
},
116-
"failure - optimistic execution cannot be enabled with gRPC streaming": {
116+
"success - optimistic execution canbe enabled with gRPC streaming": {
117117
flags: flags.Flags{
118-
NonValidatingFullNode: false,
119-
GrpcEnable: true,
120-
GrpcStreamingEnabled: true,
121-
OptimisticExecutionEnabled: true,
118+
NonValidatingFullNode: false,
119+
GrpcEnable: true,
120+
GrpcStreamingEnabled: true,
121+
OptimisticExecutionEnabled: true,
122+
GrpcStreamingMaxBatchSize: 2000,
123+
GrpcStreamingFlushIntervalMs: 100,
124+
GrpcStreamingMaxChannelBufferSize: 2000,
125+
WebsocketStreamingPort: 8989,
122126
},
123-
expectedErr: fmt.Errorf("grpc streaming cannot be enabled together with optimistic execution"),
124127
},
125128
"failure - gRPC disabled": {
126129
flags: flags.Flags{

protocol/streaming/full_node_streaming_manager.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"cosmossdk.io/log"
1414
"cosmossdk.io/store/prefix"
1515
storetypes "cosmossdk.io/store/types"
16+
"github.com/cosmos/cosmos-sdk/codec"
1617
sdk "github.com/cosmos/cosmos-sdk/types"
1718
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
1819
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
@@ -27,6 +28,7 @@ var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
2728
type FullNodeStreamingManagerImpl struct {
2829
sync.Mutex
2930

31+
cdc codec.BinaryCodec
3032
logger log.Logger
3133

3234
// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
@@ -95,6 +97,7 @@ func NewFullNodeStreamingManager(
9597
maxSubscriptionChannelSize uint32,
9698
snapshotBlockInterval uint32,
9799
streamingManagerTransientStoreKey storetypes.StoreKey,
100+
cdc codec.BinaryCodec,
98101
) *FullNodeStreamingManagerImpl {
99102
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
100103
logger: logger,
@@ -113,6 +116,7 @@ func NewFullNodeStreamingManager(
113116
snapshotBlockInterval: snapshotBlockInterval,
114117

115118
streamingManagerTransientStoreKey: streamingManagerTransientStoreKey,
119+
cdc: cdc,
116120
}
117121

118122
// Start the goroutine for pushing order updates through.
@@ -391,14 +395,15 @@ func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate(
391395
ctx sdk.Context,
392396
subaccountUpdate satypes.StreamSubaccountUpdate,
393397
) {
398+
lib.AssertDeliverTxMode(ctx)
394399
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
395400
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
396401
SubaccountUpdate: &subaccountUpdate,
397402
},
398403
}
399404
sm.stageFinalizeBlockEvent(
400405
ctx,
401-
clobtypes.Amino.MustMarshal(stagedEvent),
406+
sm.cdc.MustMarshal(&stagedEvent),
402407
)
403408
}
404409

@@ -411,25 +416,30 @@ func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill(
411416
ctx sdk.Context,
412417
fill clobtypes.StreamOrderbookFill,
413418
) {
419+
lib.AssertDeliverTxMode(ctx)
414420
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
415421
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
416422
OrderFill: &fill,
417423
},
418424
}
425+
419426
sm.stageFinalizeBlockEvent(
420427
ctx,
421-
clobtypes.Amino.MustMarshal(stagedEvent),
428+
sm.cdc.MustMarshal(&stagedEvent),
422429
)
423430
}
424431

425-
func getStagedFinalizeBlockEvents(store storetypes.KVStore) []clobtypes.StagedFinalizeBlockEvent {
432+
func getStagedFinalizeBlockEventsFromStore(
433+
store storetypes.KVStore,
434+
cdc codec.BinaryCodec,
435+
) []clobtypes.StagedFinalizeBlockEvent {
426436
count := getStagedEventsCount(store)
427437
events := make([]clobtypes.StagedFinalizeBlockEvent, count)
428438
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
429439
for i := uint32(0); i < count; i++ {
430440
var event clobtypes.StagedFinalizeBlockEvent
431441
bytes := store.Get(lib.Uint32ToKey(i))
432-
clobtypes.Amino.MustUnmarshal(bytes, &event)
442+
cdc.MustUnmarshal(bytes, &event)
433443
events[i] = event
434444
}
435445
return events
@@ -441,7 +451,7 @@ func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
441451
) []clobtypes.StagedFinalizeBlockEvent {
442452
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
443453
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
444-
return getStagedFinalizeBlockEvents(store)
454+
return getStagedFinalizeBlockEventsFromStore(store, sm.cdc)
445455
}
446456

447457
func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent(
@@ -889,6 +899,9 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
889899
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
890900
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
891901
) {
902+
// Prevent gas metering from state read.
903+
ctx = ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
904+
892905
finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)
893906

894907
orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates(

protocol/x/clob/module.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,16 @@ func (am AppModule) PreBlock(ctx context.Context) (appmodule.ResponsePreBlock, e
177177
}, nil
178178
}
179179

180+
// BeginBlock executes all ABCI BeginBlock logic respective to the clob module.
181+
func (am AppModule) Precommit(ctx context.Context) error {
182+
defer telemetry.ModuleMeasureSince(am.Name(), time.Now(), telemetry.MetricKeyPrecommiter)
183+
Precommit(
184+
lib.UnwrapSDKContext(ctx, types.ModuleName),
185+
*am.keeper,
186+
)
187+
return nil
188+
}
189+
180190
// BeginBlock executes all ABCI BeginBlock logic respective to the clob module.
181191
func (am AppModule) BeginBlock(ctx context.Context) error {
182192
defer telemetry.ModuleMeasureSince(am.Name(), time.Now(), telemetry.MetricKeyBeginBlocker)

0 commit comments

Comments
 (0)