@@ -490,19 +490,11 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes.
490490 return exists
491491}
492492
493- // SendOrderbookUpdates groups updates by their clob pair ids and
494- // sends messages to the subscribers.
495- func (sm * FullNodeStreamingManagerImpl ) SendOrderbookUpdates (
493+ func getStreamUpdatesFromOffchainUpdates (
496494 offchainUpdates * clobtypes.OffchainUpdates ,
497495 blockHeight uint32 ,
498496 execMode sdk.ExecMode ,
499- ) {
500- defer metrics .ModuleMeasureSince (
501- metrics .FullNodeGrpc ,
502- metrics .GrpcSendOrderbookUpdatesLatency ,
503- time .Now (),
504- )
505-
497+ ) (streamUpdates []clobtypes.StreamUpdate , clobPairIds []uint32 ) {
506498 // Group updates by clob pair id.
507499 updates := make (map [uint32 ]* clobtypes.OffchainUpdates )
508500 for _ , message := range offchainUpdates .Messages {
@@ -514,8 +506,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
514506 }
515507
516508 // Unmarshal each per-clob pair message to v1 updates.
517- streamUpdates : = make ([]clobtypes.StreamUpdate , 0 )
518- clobPairIds : = make ([]uint32 , 0 )
509+ streamUpdates = make ([]clobtypes.StreamUpdate , 0 )
510+ clobPairIds = make ([]uint32 , 0 )
519511 for clobPairId , update := range updates {
520512 v1updates , err := streaming_util .GetOffchainUpdatesV1 (update )
521513 if err != nil {
@@ -535,26 +527,39 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
535527 clobPairIds = append (clobPairIds , clobPairId )
536528 }
537529
538- sm . AddOrderUpdatesToCache ( streamUpdates , clobPairIds )
530+ return streamUpdates , clobPairIds
539531}
540532
541- // SendOrderbookFillUpdates groups fills by their clob pair ids and
533+ // SendOrderbookUpdates groups updates by their clob pair ids and
542534// sends messages to the subscribers.
543- func (sm * FullNodeStreamingManagerImpl ) SendOrderbookFillUpdates (
544- orderbookFills [] clobtypes.StreamOrderbookFill ,
535+ func (sm * FullNodeStreamingManagerImpl ) SendOrderbookUpdates (
536+ offchainUpdates * clobtypes.OffchainUpdates ,
545537 blockHeight uint32 ,
546538 execMode sdk.ExecMode ,
547- perpetualIdToClobPairId map [uint32 ][]clobtypes.ClobPairId ,
548539) {
549540 defer metrics .ModuleMeasureSince (
550541 metrics .FullNodeGrpc ,
551- metrics .GrpcSendOrderbookFillsLatency ,
542+ metrics .GrpcSendOrderbookUpdatesLatency ,
552543 time .Now (),
553544 )
554545
546+ streamUpdates , clobPairIds := getStreamUpdatesFromOffchainUpdates (offchainUpdates , blockHeight , execMode )
547+
548+ sm .AddOrderUpdatesToCache (streamUpdates , clobPairIds )
549+ }
550+
551+ func (sm * FullNodeStreamingManagerImpl ) getStreamUpdatesForOrderbookFills (
552+ orderbookFills []clobtypes.StreamOrderbookFill ,
553+ blockHeight uint32 ,
554+ execMode sdk.ExecMode ,
555+ perpetualIdToClobPairId map [uint32 ][]clobtypes.ClobPairId ,
556+ ) (
557+ streamUpdates []clobtypes.StreamUpdate ,
558+ clobPairIds []uint32 ,
559+ ) {
555560 // Group fills by clob pair id.
556- streamUpdates : = make ([]clobtypes.StreamUpdate , 0 )
557- clobPairIds : = make ([]uint32 , 0 )
561+ streamUpdates = make ([]clobtypes.StreamUpdate , 0 )
562+ clobPairIds = make ([]uint32 , 0 )
558563 for _ , orderbookFill := range orderbookFills {
559564 // If this is a deleveraging fill, fetch the clob pair id from the deleveraged
560565 // perpetual id.
@@ -577,6 +582,29 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
577582 streamUpdates = append (streamUpdates , streamUpdate )
578583 clobPairIds = append (clobPairIds , clobPairId )
579584 }
585+ return streamUpdates , clobPairIds
586+ }
587+
588+ // SendOrderbookFillUpdates groups fills by their clob pair ids and
589+ // sends messages to the subscribers.
590+ func (sm * FullNodeStreamingManagerImpl ) SendOrderbookFillUpdates (
591+ orderbookFills []clobtypes.StreamOrderbookFill ,
592+ blockHeight uint32 ,
593+ execMode sdk.ExecMode ,
594+ perpetualIdToClobPairId map [uint32 ][]clobtypes.ClobPairId ,
595+ ) {
596+ defer metrics .ModuleMeasureSince (
597+ metrics .FullNodeGrpc ,
598+ metrics .GrpcSendOrderbookFillsLatency ,
599+ time .Now (),
600+ )
601+
602+ streamUpdates , clobPairIds := sm .getStreamUpdatesForOrderbookFills (
603+ orderbookFills ,
604+ blockHeight ,
605+ execMode ,
606+ perpetualIdToClobPairId ,
607+ )
580608
581609 sm .AddOrderUpdatesToCache (streamUpdates , clobPairIds )
582610}
@@ -609,6 +637,31 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
609637 )
610638}
611639
640+ func getStreamUpdatesForSubaccountUpdates (
641+ subaccountUpdates []satypes.StreamSubaccountUpdate ,
642+ blockHeight uint32 ,
643+ execMode sdk.ExecMode ,
644+ ) (
645+ streamUpdates []clobtypes.StreamUpdate ,
646+ subaccountIds []* satypes.SubaccountId ,
647+ ) {
648+ // Group subaccount updates by subaccount id.
649+ streamUpdates = make ([]clobtypes.StreamUpdate , 0 )
650+ subaccountIds = make ([]* satypes.SubaccountId , 0 )
651+ for _ , subaccountUpdate := range subaccountUpdates {
652+ streamUpdate := clobtypes.StreamUpdate {
653+ UpdateMessage : & clobtypes.StreamUpdate_SubaccountUpdate {
654+ SubaccountUpdate : & subaccountUpdate ,
655+ },
656+ BlockHeight : blockHeight ,
657+ ExecMode : uint32 (execMode ),
658+ }
659+ streamUpdates = append (streamUpdates , streamUpdate )
660+ subaccountIds = append (subaccountIds , subaccountUpdate .SubaccountId )
661+ }
662+ return streamUpdates , subaccountIds
663+ }
664+
612665// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
613666// sends messages to the subscribers.
614667func (sm * FullNodeStreamingManagerImpl ) SendFinalizedSubaccountUpdates (
@@ -626,20 +679,11 @@ func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
626679 panic ("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize" )
627680 }
628681
629- // Group subaccount updates by subaccount id.
630- streamUpdates := make ([]clobtypes.StreamUpdate , 0 )
631- subaccountIds := make ([]* satypes.SubaccountId , 0 )
632- for _ , subaccountUpdate := range subaccountUpdates {
633- streamUpdate := clobtypes.StreamUpdate {
634- UpdateMessage : & clobtypes.StreamUpdate_SubaccountUpdate {
635- SubaccountUpdate : & subaccountUpdate ,
636- },
637- BlockHeight : blockHeight ,
638- ExecMode : uint32 (execMode ),
639- }
640- streamUpdates = append (streamUpdates , streamUpdate )
641- subaccountIds = append (subaccountIds , subaccountUpdate .SubaccountId )
642- }
682+ streamUpdates , subaccountIds := getStreamUpdatesForSubaccountUpdates (
683+ subaccountUpdates ,
684+ blockHeight ,
685+ execMode ,
686+ )
643687
644688 sm .AddSubaccountUpdatesToCache (streamUpdates , subaccountIds )
645689}
@@ -796,6 +840,47 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams(
796840 return ret
797841}
798842
843+ // addBatchUpdatesToCacheWithLock adds batched updates to the cache.
844+ // Used by `StreamBatchUpdatesAfterFinalizeBlock` to batch orderbook, fill
845+ // and subaccount updates in a single stream.
846+ // Note this method requires the lock and assumes that the lock has already been
847+ // acquired by the caller.
848+ func (sm * FullNodeStreamingManagerImpl ) addBatchUpdatesToCacheWithLock (
849+ orderbookStreamUpdates []clobtypes.StreamUpdate ,
850+ orderbookClobPairIds []uint32 ,
851+ fillStreamUpdates []clobtypes.StreamUpdate ,
852+ fillClobPairIds []uint32 ,
853+ subaccountStreamUpdates []clobtypes.StreamUpdate ,
854+ subaccountIds []* satypes.SubaccountId ,
855+ ) {
856+ // Add orderbook updates to cache.
857+ sm .streamUpdateCache = append (sm .streamUpdateCache , orderbookStreamUpdates ... )
858+ for _ , clobPairId := range orderbookClobPairIds {
859+ sm .streamUpdateSubscriptionCache = append (
860+ sm .streamUpdateSubscriptionCache ,
861+ sm .clobPairIdToSubscriptionIdMapping [clobPairId ],
862+ )
863+ }
864+
865+ // Add fill updates to cache.
866+ sm .streamUpdateCache = append (sm .streamUpdateCache , fillStreamUpdates ... )
867+ for _ , clobPairId := range fillClobPairIds {
868+ sm .streamUpdateSubscriptionCache = append (
869+ sm .streamUpdateSubscriptionCache ,
870+ sm .clobPairIdToSubscriptionIdMapping [clobPairId ],
871+ )
872+ }
873+
874+ // Add subaccount updates to cache.
875+ sm .streamUpdateCache = append (sm .streamUpdateCache , subaccountStreamUpdates ... )
876+ for _ , subaccountId := range subaccountIds {
877+ sm .streamUpdateSubscriptionCache = append (
878+ sm .streamUpdateSubscriptionCache ,
879+ sm .subaccountIdToSubscriptionIdMapping [* subaccountId ],
880+ )
881+ }
882+ }
883+
799884// Grpc Streaming logic after consensus agrees on a block.
800885// - Stream all events staged during `FinalizeBlock`.
801886// - Stream orderbook updates to sync fills in local ops queue.
@@ -804,33 +889,45 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
804889 orderBookUpdatesToSyncLocalOpsQueue * clobtypes.OffchainUpdates ,
805890 perpetualIdToClobPairId map [uint32 ][]clobtypes.ClobPairId ,
806891) {
807- // Flush all pending updates, since we want the onchain updates to arrive in a batch.
808- sm .FlushStreamUpdates ()
809-
810892 finalizedFills , finalizedSubaccountUpdates := sm .getStagedEventsFromFinalizeBlock (ctx )
811893
812- // TODO(CT-1190): Stream below in a single batch.
813- // Send orderbook updates to sync optimistic orderbook onchain state after FinalizeBlock.
814- sm .SendOrderbookUpdates (
894+ orderbookStreamUpdates , orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates (
815895 orderBookUpdatesToSyncLocalOpsQueue ,
816896 uint32 (ctx .BlockHeight ()),
817897 ctx .ExecMode (),
818898 )
819899
820- // Send finalized fills from FinalizeBlock.
821- sm .SendOrderbookFillUpdates (
900+ fillStreamUpdates , fillClobPairIds := sm .getStreamUpdatesForOrderbookFills (
822901 finalizedFills ,
823902 uint32 (ctx .BlockHeight ()),
824903 ctx .ExecMode (),
825904 perpetualIdToClobPairId ,
826905 )
827906
828- // Send finalized subaccount updates from FinalizeBlock.
829- sm .SendFinalizedSubaccountUpdates (
907+ subaccountStreamUpdates , subaccountIds := getStreamUpdatesForSubaccountUpdates (
830908 finalizedSubaccountUpdates ,
831909 uint32 (ctx .BlockHeight ()),
832910 ctx .ExecMode (),
833911 )
912+
913+ sm .Lock ()
914+ defer sm .Unlock ()
915+
916+ // Flush all pending updates, since we want the onchain updates to arrive in a batch.
917+ sm .FlushStreamUpdatesWithLock ()
918+
919+ sm .addBatchUpdatesToCacheWithLock (
920+ orderbookStreamUpdates ,
921+ orderbookClobPairIds ,
922+ fillStreamUpdates ,
923+ fillClobPairIds ,
924+ subaccountStreamUpdates ,
925+ subaccountIds ,
926+ )
927+
928+ // Emit all stream updates in a single batch.
929+ // Note we still have the lock, which is released right before function returns.
930+ sm .FlushStreamUpdatesWithLock ()
834931}
835932
836933// getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`.
0 commit comments