@@ -95,6 +95,7 @@ import {
9595} from '@hirosystems/api-toolkit' ;
9696import { PgServer , getConnectionArgs , getConnectionConfig } from './connection' ;
9797import { BigNumber } from 'bignumber.js' ;
98+ import { RedisNotifier } from './redis-notifier' ;
9899
99100const MIGRATIONS_TABLE = 'pgmigrations' ;
100101const INSERT_BATCH_SIZE = 500 ;
@@ -130,6 +131,7 @@ type TransactionHeader = {
130131 */
131132export class PgWriteStore extends PgStore {
132133 readonly isEventReplay : boolean ;
134+ protected readonly redisNotifier : RedisNotifier | undefined = undefined ;
133135 protected isIbdBlockHeightReached = false ;
134136 private metrics :
135137 | {
@@ -141,10 +143,12 @@ export class PgWriteStore extends PgStore {
141143 constructor (
142144 sql : PgSqlClient ,
143145 notifier : PgNotifier | undefined = undefined ,
144- isEventReplay : boolean = false
146+ isEventReplay : boolean = false ,
147+ redisNotifier : RedisNotifier | undefined = undefined
145148 ) {
146149 super ( sql , notifier ) ;
147150 this . isEventReplay = isEventReplay ;
151+ this . redisNotifier = redisNotifier ;
148152 if ( isProdEnv ) {
149153 this . metrics = {
150154 blockHeight : new prom . Gauge ( {
@@ -163,11 +167,13 @@ export class PgWriteStore extends PgStore {
163167 usageName,
164168 skipMigrations = false ,
165169 withNotifier = true ,
170+ withRedisNotifier = false ,
166171 isEventReplay = false ,
167172 } : {
168173 usageName : string ;
169174 skipMigrations ?: boolean ;
170175 withNotifier ?: boolean ;
176+ withRedisNotifier ?: boolean ;
171177 isEventReplay ?: boolean ;
172178 } ) : Promise < PgWriteStore > {
173179 const sql = await connectPostgres ( {
@@ -190,7 +196,8 @@ export class PgWriteStore extends PgStore {
190196 } ) ;
191197 }
192198 const notifier = withNotifier ? await PgNotifier . create ( usageName ) : undefined ;
193- const store = new PgWriteStore ( sql , notifier , isEventReplay ) ;
199+ const redisNotifier = withRedisNotifier ? new RedisNotifier ( ) : undefined ;
200+ const store = new PgWriteStore ( sql , notifier , isEventReplay , redisNotifier ) ;
194201 await store . connectPgNotifier ( ) ;
195202 return store ;
196203 }
@@ -229,11 +236,13 @@ export class PgWriteStore extends PgStore {
229236 async update ( data : DataStoreBlockUpdateData ) : Promise < void > {
230237 let garbageCollectedMempoolTxs : string [ ] = [ ] ;
231238 let newTxData : DataStoreTxEventData [ ] = [ ] ;
239+ let reorg : ReOrgUpdatedEntities = newReOrgUpdatedEntities ( ) ;
240+ let isCanonical = true ;
232241
233242 await this . sqlWriteTransaction ( async sql => {
234243 const chainTip = await this . getChainTip ( sql ) ;
235- const reorg = await this . handleReorg ( sql , data . block , chainTip . block_height ) ;
236- const isCanonical = data . block . block_height > chainTip . block_height ;
244+ reorg = await this . handleReorg ( sql , data . block , chainTip . block_height ) ;
245+ isCanonical = data . block . block_height > chainTip . block_height ;
237246 if ( ! isCanonical ) {
238247 markBlockUpdateDataAsNonCanonical ( data ) ;
239248 } else {
@@ -396,6 +405,9 @@ export class PgWriteStore extends PgStore {
396405 }
397406 }
398407 } ) ;
408+ if ( isCanonical ) {
409+ await this . redisNotifier ?. notify ( reorg , data . block . index_block_hash , data . block . block_height ) ;
410+ }
399411 // Do we have an IBD height defined in ENV? If so, check if this block update reached it.
400412 const ibdHeight = getIbdBlockHeight ( ) ;
401413 this . isIbdBlockHeightReached = ibdHeight ? data . block . block_height > ibdHeight : true ;
@@ -3548,6 +3560,13 @@ export class PgWriteStore extends PgStore {
35483560 return result ;
35493561 }
35503562
3563+ /**
3564+ * Recursively restore previously orphaned blocks to canonical.
3565+ * @param sql - The SQL client
3566+ * @param indexBlockHash - The index block hash that we will restore first
3567+ * @param updatedEntities - The updated entities
3568+ * @returns The updated entities
3569+ */
35513570 async restoreOrphanedChain (
35523571 sql : PgSqlClient ,
35533572 indexBlockHash : string ,
@@ -3568,6 +3587,10 @@ export class PgWriteStore extends PgStore {
35683587 throw new Error ( `Found multiple non-canonical parents for index_hash ${ indexBlockHash } ` ) ;
35693588 }
35703589 updatedEntities . markedCanonical . blocks ++ ;
3590+ updatedEntities . markedCanonical . blockHeaders . unshift ( {
3591+ index_block_hash : restoredBlockResult [ 0 ] . index_block_hash ,
3592+ block_height : restoredBlockResult [ 0 ] . block_height ,
3593+ } ) ;
35713594
35723595 // Orphan the now conflicting block at the same height
35733596 const orphanedBlockResult = await sql < BlockQueryResult [ ] > `
@@ -3606,6 +3629,10 @@ export class PgWriteStore extends PgStore {
36063629 }
36073630
36083631 updatedEntities . markedNonCanonical . blocks ++ ;
3632+ updatedEntities . markedNonCanonical . blockHeaders . unshift ( {
3633+ index_block_hash : orphanedBlockResult [ 0 ] . index_block_hash ,
3634+ block_height : orphanedBlockResult [ 0 ] . block_height ,
3635+ } ) ;
36093636 const markNonCanonicalResult = await this . markEntitiesCanonical (
36103637 sql ,
36113638 orphanedBlockResult [ 0 ] . index_block_hash ,
@@ -3662,6 +3689,8 @@ export class PgWriteStore extends PgStore {
36623689 markCanonicalResult . txsMarkedCanonical
36633690 ) ;
36643691 updatedEntities . prunedMempoolTxs += prunedMempoolTxs . removedTxs . length ;
3692+
3693+ // Do we have a parent that is non-canonical? If so, restore it recursively.
36653694 const parentResult = await sql < { index_block_hash : string } [ ] > `
36663695 SELECT index_block_hash
36673696 FROM blocks
@@ -4019,6 +4048,7 @@ export class PgWriteStore extends PgStore {
40194048 if ( this . _debounceMempoolStat . debounce ) {
40204049 clearTimeout ( this . _debounceMempoolStat . debounce ) ;
40214050 }
4051+ await this . redisNotifier ?. close ( ) ;
40224052 await super . close ( args ) ;
40234053 }
40244054}
0 commit comments