Skip to content

Commit f7fec01

Browse files
authored
delete 'shallow blocks' in block cache when not using firehose (#4691)
* add mechanism to delete 'shallow blocks' in block cache when a chain is not firehose-only * add proper indexes for chainXX.blocks in migration --------- Co-authored-by: Stéphane Duchesneau <stephane.duchesneau@streamingfast.io>
1 parent 019f3db commit f7fec01

File tree

5 files changed

+90
-1
lines changed

5 files changed

+90
-1
lines changed

node/src/main.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,16 @@ async fn main() {
378378
Some(&eth_firehose_only_networks)
379379
};
380380

381+
let eth_firehose_only_network_names = match eth_firehose_only_networks {
382+
Some(firehose_only) => Some(Vec::from_iter(firehose_only.networks.keys())),
383+
None => None,
384+
};
385+
386+
network_store
387+
.block_store()
388+
.cleanup_shallow_blocks(eth_firehose_only_network_names)
389+
.unwrap();
390+
381391
let ethereum_chains = ethereum_networks_as_chains(
382392
&mut blockchain_map,
383393
&logger,
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
drop index ethereum_blocks_nulldata_idx ;
2+
3+
do $$
4+
declare
5+
tables cursor for select namespace
6+
from ethereum_networks
7+
where namespace != 'public';
8+
begin
9+
for table_record in tables loop
10+
execute
11+
'drop index '
12+
|| table_record.namespace
13+
|| '.'
14+
|| 'blocks_nulldata_idx';
15+
end loop;
16+
end;
17+
$$;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
create index if not exists
2+
ethereum_blocks_nulldata_idx
3+
on
4+
ethereum_blocks
5+
using
6+
BTREE ((data->'block'->'data'))
7+
where
8+
data->'block'->'data' = 'null'::jsonb;
9+
10+
do $$
11+
declare
12+
tables cursor for select namespace
13+
from ethereum_networks
14+
where namespace != 'public';
15+
begin
16+
for table_record in tables loop
17+
execute
18+
'create index if not exists blocks_nulldata_idx on '
19+
|| table_record.namespace
20+
|| '.'
21+
|| 'blocks '
22+
|| 'using BTREE ((data->''block''->''data'')) '
23+
|| 'where data->''block''->''data'' = ''null''::jsonb';
24+
end loop;
25+
end;
26+
$$;

store/postgres/src/block_store.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
use graph::{
88
blockchain::ChainIdentifier,
99
components::store::BlockStore as BlockStoreTrait,
10-
prelude::{error, warn, BlockNumber, BlockPtr, Logger, ENV_VARS},
10+
prelude::{error, info, warn, BlockNumber, BlockPtr, Logger, ENV_VARS},
1111
};
1212
use graph::{constraint_violation, prelude::CheapClone};
1313
use graph::{
@@ -442,6 +442,23 @@ impl BlockStore {
442442
Ok(())
443443
}
444444

445+
pub fn cleanup_shallow_blocks(
446+
&self,
447+
firehose_networks: Option<Vec<&String>>,
448+
) -> Result<(), StoreError> {
449+
for store in self.stores.read().unwrap().values() {
450+
if let Some(fh_nets) = firehose_networks.clone() {
451+
if fh_nets.contains(&&store.chain) {
452+
continue;
453+
};
454+
}
455+
456+
info!(&self.logger, "Cleaning shallow blocks on non-firehose chain"; "network" => &store.chain);
457+
store.cleanup_shallow_blocks()?
458+
}
459+
Ok(())
460+
}
461+
445462
fn truncate_block_caches(&self) -> Result<(), StoreError> {
446463
for store in self.stores.read().unwrap().values() {
447464
store.truncate_block_cache()?

store/postgres/src/chain_store.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ mod data {
346346
data jsonb not null
347347
);
348348
create index blocks_number ON {nsp}.blocks using btree(number);
349+
create index blocks_expr_idx ON {nsp}.blocks USING BTREE ((data->'block'->'data')) where data->'block'->'data' = 'null'::jsonb;
349350
350351
create table {nsp}.call_cache (
351352
id bytea not null primary key,
@@ -418,6 +419,18 @@ mod data {
418419
Ok(())
419420
}
420421

422+
pub(super) fn cleanup_shallow_blocks(&self, conn: &PgConnection) -> Result<(), StoreError> {
423+
let table_name = match &self {
424+
Storage::Shared => ETHEREUM_BLOCKS_TABLE_NAME,
425+
Storage::Private(Schema { blocks, .. }) => &blocks.qname,
426+
};
427+
conn.batch_execute(&format!(
428+
"delete from {} WHERE data->'block'->'data' = 'null'::jsonb;",
429+
table_name
430+
))?;
431+
Ok(())
432+
}
433+
421434
/// Insert a block. If the table already contains a block with the
422435
/// same hash, then overwrite that block since it may be adding
423436
/// transaction receipts. If `overwrite` is `true`, overwrite a
@@ -1553,6 +1566,12 @@ impl ChainStore {
15531566
.delete_blocks_by_hash(&conn, &self.chain, block_hashes)
15541567
}
15551568

1569+
pub fn cleanup_shallow_blocks(&self) -> Result<(), StoreError> {
1570+
let conn = self.get_conn()?;
1571+
self.storage.cleanup_shallow_blocks(&conn)?;
1572+
Ok(())
1573+
}
1574+
15561575
pub fn truncate_block_cache(&self) -> Result<(), StoreError> {
15571576
let conn = self.get_conn()?;
15581577
self.storage.truncate_block_cache(&conn)?;

0 commit comments

Comments
 (0)