Skip to content

Commit c1d12a4

Browse files
authored
Merge pull request #2358 from CortexFoundation/dev
introduce database sync function
2 parents 5f9bc74 + 16e1543 commit c1d12a4

20 files changed

+290
-53
lines changed

core/bench_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
151151
if !disk {
152152
db = rawdb.NewMemoryDatabase()
153153
} else {
154-
pdb, err := pebble.New(b.TempDir(), 128, 128, "", false, true)
154+
pdb, err := pebble.New(b.TempDir(), 128, 128, "", false)
155155
if err != nil {
156156
b.Fatalf("cannot create temporary database: %v", err)
157157
}
@@ -249,7 +249,7 @@ func benchWriteChain(b *testing.B, full bool, count uint64) {
249249
if err != nil {
250250
b.Fatalf("cannot create temporary directory: %v", err)
251251
}
252-
pdb, err := pebble.New(b.TempDir(), 1024, 128, "", false, true)
252+
pdb, err := pebble.New(b.TempDir(), 1024, 128, "", false)
253253
if err != nil {
254254
b.Fatalf("error opening database: %v", err)
255255
}
@@ -267,7 +267,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) {
267267
}
268268
defer os.RemoveAll(dir)
269269

270-
pdb, err := pebble.New(dir, 1024, 128, "", false, true)
270+
pdb, err := pebble.New(dir, 1024, 128, "", false)
271271
if err != nil {
272272
b.Fatalf("error opening database: %v", err)
273273
}
@@ -280,7 +280,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) {
280280
b.ResetTimer()
281281

282282
for i := 0; i < b.N; i++ {
283-
pdb, err = pebble.New(dir, 1024, 128, "", false, true)
283+
pdb, err = pebble.New(dir, 1024, 128, "", false)
284284
if err != nil {
285285
b.Fatalf("error opening database: %v", err)
286286
}

core/blockchain.go

Lines changed: 105 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ type BlockChain struct {
197197
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
198198
gcproc time.Duration // Accumulates canonical block processing for trie dumping
199199

200+
lastWrite uint64
201+
flushInterval atomic.Int64
202+
200203
// txLookupLimit is the maximum number of blocks from head whose tx indices
201204
// are reserved:
202205
// * 0: means no limit and regenerate any missing indexes
@@ -806,17 +809,16 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
806809
// Ignore the error here since light client won't hit this path
807810
frozen, _ := bc.db.Ancients()
808811
if num+1 <= frozen {
809-
// Truncate all relative data(header, total difficulty, body, receipt
810-
// and canonical hash) from ancient store.
811-
if _, err := bc.db.TruncateHead(num); err != nil {
812-
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
813-
}
814-
// Remove the hash <-> number mapping from the active store.
815-
rawdb.DeleteHeaderNumber(db, hash)
812+
// The chain segment, such as the block header, canonical hash,
813+
// body, and receipt, will be removed from the ancient store
814+
// in one go.
815+
//
816+
// The hash-to-number mapping in the key-value store will be
817+
// removed by the hc.SetHead function.
816818
} else {
817-
// Remove relative body and receipts from the active store.
818-
// The header, total difficulty and canonical hash will be
819-
// removed in the hc.SetHead function.
819+
// Remove the associated body and receipts from the key-value store.
820+
// The header, hash-to-number mapping, and canonical hash will be
821+
// removed by the hc.SetHead function.
820822
rawdb.DeleteBody(db, hash, num)
821823
rawdb.DeleteReceipts(db, hash, num)
822824
}
@@ -1237,7 +1239,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
12371239
size += writeSize
12381240

12391241
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
1240-
if err := bc.db.Sync(); err != nil {
1242+
if err := bc.db.SyncAncient(); err != nil {
12411243
return 0, err
12421244
}
12431245
// Update the current fast block because all block data is now present in DB.
@@ -1366,8 +1368,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
13661368
return 0, nil
13671369
}
13681370

1369-
var lastWrite uint64
1370-
13711371
// writeBlockWithoutState writes only the block and its metadata to the database,
13721372
// but does not write any state. This is used to construct competing side forks
13731373
// up to the point where they exceed the canonical total difficulty.
@@ -1475,12 +1475,12 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
14751475
} else {
14761476
// If we're exceeding limits but haven't reached a large enough memory gap,
14771477
// warn the user that the system is becoming unstable.
1478-
if chosen < lastWrite+state.TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
1479-
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/state.TriesInMemory)
1478+
if chosen < bc.lastWrite+state.TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
1479+
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-bc.lastWrite)/state.TriesInMemory)
14801480
}
14811481
// Flush an entire trie and restart the counters
14821482
triedb.Commit(header.Root, true)
1483-
lastWrite = chosen
1483+
bc.lastWrite = chosen
14841484
bc.gcproc = 0
14851485
}
14861486
}
@@ -2402,3 +2402,92 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
24022402
_, err := bc.hc.InsertHeaderChain(chain, start)
24032403
return 0, err
24042404
}
2405+
2406+
// InsertHeadersBeforeCutoff inserts the given headers into the ancient store
2407+
// as they are claimed older than the configured chain cutoff point. All the
2408+
// inserted headers are regarded as canonical and chain reorg is not supported.
2409+
func (bc *BlockChain) InsertHeadersBeforeCutoff(headers []*types.Header) (int, error) {
2410+
if len(headers) == 0 {
2411+
return 0, nil
2412+
}
2413+
// TODO(rjl493456442): Headers before the configured cutoff have already
2414+
// been verified by the hash of cutoff header. Theoretically, header validation
2415+
// could be skipped here.
2416+
if n, err := bc.hc.ValidateHeaderChain(headers, 0); err != nil {
2417+
return n, err
2418+
}
2419+
if !bc.chainmu.TryLock() {
2420+
return 0, errChainStopped
2421+
}
2422+
defer bc.chainmu.Unlock()
2423+
2424+
// Initialize the ancient store with genesis block if it's empty.
2425+
var (
2426+
frozen, _ = bc.db.Ancients()
2427+
first = headers[0].Number.Uint64()
2428+
)
2429+
if first == 1 && frozen == 0 {
2430+
_, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, big.NewInt(0))
2431+
if err != nil {
2432+
log.Error("Error writing genesis to ancients", "err", err)
2433+
return 0, err
2434+
}
2435+
log.Info("Wrote genesis to ancient store")
2436+
} else if frozen != first {
2437+
return 0, fmt.Errorf("headers are gapped with the ancient store, first: %d, ancient: %d", first, frozen)
2438+
}
2439+
2440+
// Write headers to the ancient store, with block bodies and receipts set to nil
2441+
// to ensure consistency across tables in the freezer.
2442+
_, err := rawdb.WriteAncientHeaderChain(bc.db, headers)
2443+
if err != nil {
2444+
return 0, err
2445+
}
2446+
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
2447+
if err := bc.db.SyncAncient(); err != nil {
2448+
return 0, err
2449+
}
2450+
// Write hash to number mappings
2451+
batch := bc.db.NewBatch()
2452+
for _, header := range headers {
2453+
rawdb.WriteHeaderNumber(batch, header.Hash(), header.Number.Uint64())
2454+
}
2455+
// Write head header and head snap block flags
2456+
last := headers[len(headers)-1]
2457+
rawdb.WriteHeadHeaderHash(batch, last.Hash())
2458+
rawdb.WriteHeadFastBlockHash(batch, last.Hash())
2459+
if err := batch.Write(); err != nil {
2460+
return 0, err
2461+
}
2462+
// Truncate the useless chain segment (zero bodies and receipts) in the
2463+
// ancient store.
2464+
if _, err := bc.db.TruncateTail(last.Number.Uint64() + 1); err != nil {
2465+
return 0, err
2466+
}
2467+
// Last step update all in-memory markers
2468+
bc.hc.currentHeader.Store(last)
2469+
bc.currentFastBlock.Store(last)
2470+
headHeaderGauge.Update(last.Number.Int64())
2471+
headFastBlockGauge.Update(last.Number.Int64())
2472+
return 0, nil
2473+
}
2474+
2475+
// SetBlockValidatorAndProcessorForTesting sets the current validator and processor.
2476+
// This method can be used to force an invalid blockchain to be verified for tests.
2477+
// This method is unsafe and should only be used before block import starts.
2478+
func (bc *BlockChain) SetBlockValidatorAndProcessorForTesting(v Validator, p Processor) {
2479+
bc.validator = v
2480+
bc.processor = p
2481+
}
2482+
2483+
// SetTrieFlushInterval configures how often in-memory tries are persisted to disk.
2484+
// The interval is in terms of block processing time, not wall clock.
2485+
// It is thread-safe and can be called repeatedly without side effects.
2486+
func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
2487+
bc.flushInterval.Store(int64(interval))
2488+
}
2489+
2490+
// GetTrieFlushInterval gets the in-memory tries flushAlloc interval
2491+
func (bc *BlockChain) GetTrieFlushInterval() time.Duration {
2492+
return time.Duration(bc.flushInterval.Load())
2493+
}

core/headerchain.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,18 +675,51 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat
675675
hashes = append(hashes, hdr.Hash())
676676
}
677677
for _, hash := range hashes {
678+
// Remove the associated block body and receipts if required.
679+
//
680+
// If the block is in the chain freezer, then this delete operation
681+
// is actually ineffective.
678682
if delFn != nil {
679683
delFn(batch, hash, num)
680684
}
685+
// Remove the hash->number mapping along with the header itself
681686
rawdb.DeleteHeader(batch, hash, num)
682687
rawdb.DeleteTd(batch, hash, num)
683688
}
689+
// Remove the number->hash mapping
684690
rawdb.DeleteCanonicalHash(batch, num)
685691
}
686692
}
687693
// Flush all accumulated deletions.
688694
if err := batch.Write(); err != nil {
689-
log.Crit("Failed to rewind block", "error", err)
695+
log.Crit("Failed to commit batch in setHead", "err", err)
696+
}
697+
// Explicitly flush the pending writes in the key-value store to disk, ensuring
698+
// data durability of the previous deletions.
699+
if err := hc.chainDb.SyncKeyValue(); err != nil {
700+
log.Crit("Failed to sync the key-value store in setHead", "err", err)
701+
}
702+
// Truncate the excessive chain segments in the ancient store.
703+
// These are actually deferred deletions from the loop above.
704+
//
705+
// This step must be performed after synchronizing the key-value store;
706+
// otherwise, in the event of a panic, it's theoretically possible to
707+
// lose recent key-value store writes while the ancient store deletions
708+
// remain, leading to data inconsistency, e.g., the gap between the key
709+
// value store and ancient can be created due to unclean shutdown.
710+
if delFn != nil {
711+
// Ignore the error here since light client won't hit this path
712+
frozen, _ := hc.chainDb.Ancients()
713+
header := hc.CurrentHeader()
714+
715+
// Truncate the excessive chain segment above the current chain head
716+
// in the ancient store.
717+
if header.Number.Uint64()+1 < frozen {
718+
_, err := hc.chainDb.TruncateHead(header.Number.Uint64() + 1)
719+
if err != nil {
720+
log.Crit("Failed to truncate head block", "err", err)
721+
}
722+
}
690723
}
691724
// Clear out any stale content from the caches
692725
hc.headerCache.Purge()

core/rawdb/accessors_chain.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,30 @@ func writeAncientBlock(op ctxcdb.AncientWriteOp, block *types.Block, header *typ
815815
return nil
816816
}
817817

818+
// WriteAncientHeaderChain writes the supplied headers along with nil block
819+
// bodies and receipts into the ancient store. It's supposed to be used for
820+
// storing chain segment before the chain cutoff.
821+
func WriteAncientHeaderChain(db ctxcdb.AncientWriter, headers []*types.Header) (int64, error) {
822+
return db.ModifyAncients(func(op ctxcdb.AncientWriteOp) error {
823+
for _, header := range headers {
824+
num := header.Number.Uint64()
825+
if err := op.AppendRaw(ChainFreezerHashTable, num, header.Hash().Bytes()); err != nil {
826+
return fmt.Errorf("can't add block %d hash: %v", num, err)
827+
}
828+
if err := op.Append(ChainFreezerHeaderTable, num, header); err != nil {
829+
return fmt.Errorf("can't append block header %d: %v", num, err)
830+
}
831+
if err := op.AppendRaw(ChainFreezerBodiesTable, num, nil); err != nil {
832+
return fmt.Errorf("can't append block body %d: %v", num, err)
833+
}
834+
if err := op.AppendRaw(ChainFreezerReceiptTable, num, nil); err != nil {
835+
return fmt.Errorf("can't append block %d receipts: %v", num, err)
836+
}
837+
}
838+
return nil
839+
})
840+
}
841+
818842
// DeleteBlock removes all block data associated with a hash.
819843
func DeleteBlock(db ctxcdb.KeyValueWriter, hash common.Hash, number uint64) {
820844
DeleteReceipts(db, hash, number)

core/rawdb/chain_freezer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func (f *chainFreezer) freeze(db ctxcdb.KeyValueStore) {
205205
continue
206206
}
207207
// Batch of blocks have been frozen, flush them before wiping from key-value store
208-
if err := f.Sync(); err != nil {
208+
if err := f.SyncAncient(); err != nil {
209209
log.Crit("Failed to flush frozen tables", "err", err)
210210
}
211211
// Wipe out all data from the active database

core/rawdb/database.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ func (db *nofreezedb) TruncateTail(items uint64) (uint64, error) {
131131
return 0, errNotSupported
132132
}
133133

134-
// Sync returns an error as we don't have a backing chain freezer.
135-
func (db *nofreezedb) Sync() error {
134+
// SyncAncient returns an error as we don't have a backing chain freezer.
135+
func (db *nofreezedb) SyncAncient() error {
136136
return errNotSupported
137137
}
138138

core/rawdb/freezer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,8 @@ func (f *Freezer) TruncateTail(tail uint64) (uint64, error) {
324324
return old, nil
325325
}
326326

327-
// Sync flushes all data tables to disk.
328-
func (f *Freezer) Sync() error {
327+
// SyncAncient flushes all data tables to disk.
328+
func (f *Freezer) SyncAncient() error {
329329
var errs []error
330330
for _, table := range f.tables {
331331
if err := table.Sync(); err != nil {

core/rawdb/freezer_memory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,8 +389,8 @@ func (f *MemoryFreezer) TruncateTail(tail uint64) (uint64, error) {
389389
return old, nil
390390
}
391391

392-
// Sync flushes all data tables to disk.
393-
func (f *MemoryFreezer) Sync() error {
392+
// SyncAncient flushes all data tables to disk.
393+
func (f *MemoryFreezer) SyncAncient() error {
394394
return nil
395395
}
396396

core/rawdb/freezer_resettable.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,12 @@ func (f *resettableFreezer) TruncateTail(tail uint64) (uint64, error) {
194194
return f.freezer.TruncateTail(tail)
195195
}
196196

197-
// Sync flushes all data tables to disk.
198-
func (f *resettableFreezer) Sync() error {
197+
// SyncAncient flushes all data tables to disk.
198+
func (f *resettableFreezer) SyncAncient() error {
199199
f.lock.RLock()
200200
defer f.lock.RUnlock()
201201

202-
return f.freezer.Sync()
202+
return f.freezer.SyncAncient()
203203
}
204204

205205
// AncientDatadir returns the path of the ancient store.

core/rawdb/freezer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ func TestFreezerCloseSync(t *testing.T) {
474474
if err := f.Close(); err != nil {
475475
t.Fatal(err)
476476
}
477-
if err := f.Sync(); err == nil {
477+
if err := f.SyncAncient(); err == nil {
478478
t.Fatalf("want error, have nil")
479479
} else if have, want := err.Error(), "[closed closed]"; have != want {
480480
t.Fatalf("want %v, have %v", have, want)

core/rawdb/table.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,10 @@ func (t *table) TruncateTail(items uint64) (uint64, error) {
107107
return t.db.TruncateTail(items)
108108
}
109109

110-
// Sync is a noop passthrough that just forwards the request to the underlying
110+
// SyncAncient is a noop passthrough that just forwards the request to the underlying
111111
// database.
112-
func (t *table) Sync() error {
113-
return t.db.Sync()
112+
func (t *table) SyncAncient() error {
113+
return t.db.SyncAncient()
114114
}
115115

116116
// AncientDatadir returns the ancient datadir of the underlying database.
@@ -188,6 +188,12 @@ func (t *table) Compact(start []byte, limit []byte) error {
188188
return t.db.Compact(start, limit)
189189
}
190190

191+
// SyncKeyValue ensures that all pending writes are flushed to disk,
192+
// guaranteeing data durability up to the point.
193+
func (t *table) SyncKeyValue() error {
194+
return t.db.SyncKeyValue()
195+
}
196+
191197
// NewBatch creates a write-only database that buffers changes to its host db
192198
// until a final write is called, each operation prefixing all keys with the
193199
// pre-configured string.

0 commit comments

Comments
 (0)