Skip to content

Commit d573102

Browse files
teddydingjonfung-dydx
authored andcommitted
[OTE-456] FNS x OE: stage FinalizeBlock events and emit in Precommit (#2253)
1 parent 9f2846e commit d573102

File tree

17 files changed

+1118
-290
lines changed

17 files changed

+1118
-290
lines changed

indexer/packages/v4-protos/src/codegen/dydxprotocol/bundle.ts

Lines changed: 234 additions & 232 deletions
Large diffs are not rendered by default.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import { StreamOrderbookFill, StreamOrderbookFillSDKType } from "./query";
2+
import { StreamSubaccountUpdate, StreamSubaccountUpdateSDKType } from "../subaccounts/streaming";
3+
import * as _m0 from "protobufjs/minimal";
4+
import { DeepPartial } from "../../helpers";
5+
/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */
6+
7+
export interface StagedFinalizeBlockEvent {
8+
orderFill?: StreamOrderbookFill;
9+
subaccountUpdate?: StreamSubaccountUpdate;
10+
}
11+
/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */
12+
13+
export interface StagedFinalizeBlockEventSDKType {
14+
order_fill?: StreamOrderbookFillSDKType;
15+
subaccount_update?: StreamSubaccountUpdateSDKType;
16+
}
17+
18+
function createBaseStagedFinalizeBlockEvent(): StagedFinalizeBlockEvent {
19+
return {
20+
orderFill: undefined,
21+
subaccountUpdate: undefined
22+
};
23+
}
24+
25+
export const StagedFinalizeBlockEvent = {
26+
encode(message: StagedFinalizeBlockEvent, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
27+
if (message.orderFill !== undefined) {
28+
StreamOrderbookFill.encode(message.orderFill, writer.uint32(10).fork()).ldelim();
29+
}
30+
31+
if (message.subaccountUpdate !== undefined) {
32+
StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(18).fork()).ldelim();
33+
}
34+
35+
return writer;
36+
},
37+
38+
decode(input: _m0.Reader | Uint8Array, length?: number): StagedFinalizeBlockEvent {
39+
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
40+
let end = length === undefined ? reader.len : reader.pos + length;
41+
const message = createBaseStagedFinalizeBlockEvent();
42+
43+
while (reader.pos < end) {
44+
const tag = reader.uint32();
45+
46+
switch (tag >>> 3) {
47+
case 1:
48+
message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32());
49+
break;
50+
51+
case 2:
52+
message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32());
53+
break;
54+
55+
default:
56+
reader.skipType(tag & 7);
57+
break;
58+
}
59+
}
60+
61+
return message;
62+
},
63+
64+
fromPartial(object: DeepPartial<StagedFinalizeBlockEvent>): StagedFinalizeBlockEvent {
65+
const message = createBaseStagedFinalizeBlockEvent();
66+
message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined;
67+
message.subaccountUpdate = object.subaccountUpdate !== undefined && object.subaccountUpdate !== null ? StreamSubaccountUpdate.fromPartial(object.subaccountUpdate) : undefined;
68+
return message;
69+
}
70+
71+
};
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
import * as _117 from "./gogo";
2-
export const gogoproto = { ..._117
1+
import * as _118 from "./gogo";
2+
export const gogoproto = { ..._118
33
};
Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
import * as _118 from "./api/annotations";
2-
import * as _119 from "./api/http";
3-
import * as _120 from "./protobuf/descriptor";
4-
import * as _121 from "./protobuf/duration";
5-
import * as _122 from "./protobuf/timestamp";
6-
import * as _123 from "./protobuf/any";
1+
import * as _119 from "./api/annotations";
2+
import * as _120 from "./api/http";
3+
import * as _121 from "./protobuf/descriptor";
4+
import * as _122 from "./protobuf/duration";
5+
import * as _123 from "./protobuf/timestamp";
6+
import * as _124 from "./protobuf/any";
77
export namespace google {
8-
export const api = { ..._118,
9-
..._119
8+
export const api = { ..._119,
9+
..._120
1010
};
11-
export const protobuf = { ..._120,
12-
..._121,
11+
export const protobuf = { ..._121,
1312
..._122,
14-
..._123
13+
..._123,
14+
..._124
1515
};
1616
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
syntax = "proto3";
2+
package dydxprotocol.clob;
3+
4+
import "dydxprotocol/subaccounts/streaming.proto";
5+
import "dydxprotocol/clob/query.proto";
6+
7+
option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types";
8+
9+
// StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`.
10+
message StagedFinalizeBlockEvent {
11+
// Contains one of StreamOrderbookFill, StreamSubaccountUpdate.
12+
oneof event {
13+
StreamOrderbookFill order_fill = 1;
14+
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2;
15+
}
16+
}

protocol/app/app.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ func New(
467467
statsmoduletypes.TransientStoreKey,
468468
rewardsmoduletypes.TransientStoreKey,
469469
indexer_manager.TransientStoreKey,
470+
streaming.StreamingManagerTransientStoreKey,
470471
perpetualsmoduletypes.TransientStoreKey,
471472
)
472473
memKeys := storetypes.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, clobmoduletypes.MemStoreKey)
@@ -762,6 +763,7 @@ func New(
762763
appFlags,
763764
appCodec,
764765
logger,
766+
tkeys[streaming.StreamingManagerTransientStoreKey],
765767
)
766768

767769
timeProvider := &timelib.TimeProviderImpl{}
@@ -2029,6 +2031,7 @@ func getFullNodeStreamingManagerFromOptions(
20292031
appFlags flags.Flags,
20302032
cdc codec.Codec,
20312033
logger log.Logger,
2034+
streamingManagerTransientStoreKey storetypes.StoreKey,
20322035
) (manager streamingtypes.FullNodeStreamingManager, wsServer *ws.WebsocketServer) {
20332036
logger = logger.With(log.ModuleKey, "full-node-streaming")
20342037
if appFlags.GrpcStreamingEnabled {
@@ -2042,6 +2045,7 @@ func getFullNodeStreamingManagerFromOptions(
20422045
appFlags.GrpcStreamingMaxBatchSize,
20432046
appFlags.GrpcStreamingMaxChannelBufferSize,
20442047
appFlags.FullNodeStreamingSnapshotInterval,
2048+
streamingManagerTransientStoreKey,
20452049
)
20462050

20472051
// Start websocket server.

protocol/lib/metrics/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ const (
202202
UpdateType = "update_type"
203203
ValidateMatches = "validate_matches"
204204
ValidateOrder = "validate_order"
205+
StreamBatchUpdatesAfterFinalizeBlock = "stream_batch_updates_after_finalize_block"
205206

206207
// MemCLOB.
207208
AddedToOrderBook = "added_to_orderbook"

protocol/lib/metrics/metric_keys.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,19 +66,22 @@ const (
6666
GateWithdrawalsIfNegativeTncSubaccountSeenLatency = "gate_withdrawals_if_negative_tnc_subaccount_seen_latency"
6767

6868
// Full node grpc
69-
FullNodeGrpc = "full_node_grpc"
70-
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
71-
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
72-
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
73-
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
74-
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
75-
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
76-
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
77-
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
78-
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
79-
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
80-
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
81-
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
69+
FullNodeGrpc = "full_node_grpc"
70+
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
71+
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
72+
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
73+
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
74+
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
75+
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
76+
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
77+
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
78+
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
79+
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
80+
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
81+
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
82+
GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count"
83+
GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count"
84+
GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count"
8285

8386
EndBlocker = "end_blocker"
8487
EndBlockerLag = "end_blocker_lag"

protocol/streaming/constants.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package streaming
2+
3+
// Constants for FullNodeStreamingManager.
4+
const (
5+
// Transient store key for storing staged events.
6+
StreamingManagerTransientStoreKey = "tmp_streaming"
7+
8+
// Key for storing the count of staged events.
9+
StagedEventsCountKey = "EvtCnt"
10+
11+
// Key prefix for staged events.
12+
StagedEventsKeyPrefix = "Evt:"
13+
)

0 commit comments

Comments
 (0)