Skip to content

core, ethdb: introduce database sync function #31703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
if !disk {
db = rawdb.NewMemoryDatabase()
} else {
pdb, err := pebble.New(b.TempDir(), 128, 128, "", false, true)
pdb, err := pebble.New(b.TempDir(), 128, 128, "", false)
if err != nil {
b.Fatalf("cannot create temporary database: %v", err)
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func makeChainForBench(db ethdb.Database, genesis *Genesis, full bool, count uin
func benchWriteChain(b *testing.B, full bool, count uint64) {
genesis := &Genesis{Config: params.AllEthashProtocolChanges}
for i := 0; i < b.N; i++ {
pdb, err := pebble.New(b.TempDir(), 1024, 128, "", false, true)
pdb, err := pebble.New(b.TempDir(), 1024, 128, "", false)
if err != nil {
b.Fatalf("error opening database: %v", err)
}
Expand All @@ -316,7 +316,7 @@ func benchWriteChain(b *testing.B, full bool, count uint64) {
func benchReadChain(b *testing.B, full bool, count uint64) {
dir := b.TempDir()

pdb, err := pebble.New(dir, 1024, 128, "", false, true)
pdb, err := pebble.New(dir, 1024, 128, "", false)
if err != nil {
b.Fatalf("error opening database: %v", err)
}
Expand All @@ -332,7 +332,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
pdb, err = pebble.New(dir, 1024, 128, "", false, true)
pdb, err = pebble.New(dir, 1024, 128, "", false)
if err != nil {
b.Fatalf("error opening database: %v", err)
}
Expand Down
24 changes: 12 additions & 12 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,17 +979,16 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// Ignore the error here since light client won't hit this path
frozen, _ := bc.db.Ancients()
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if _, err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
// The chain segment, such as the block header, canonical hash,
// body, and receipt, will be removed from the ancient store
// in one go.
//
// The hash-to-number mapping in the key-value store will be
// removed by the hc.SetHead function.
} else {
// Remove relative body and receipts from the active store.
// The header, total difficulty and canonical hash will be
// removed in the hc.SetHead function.
// Remove the associated body and receipts from the key-value store.
// The header, hash-to-number mapping, and canonical hash will be
// removed by the hc.SetHead function.
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
}
Expand Down Expand Up @@ -1361,7 +1360,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
size += writeSize

// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
if err := bc.db.SyncAncient(); err != nil {
return 0, err
}
// Write hash to number mappings
Expand Down Expand Up @@ -2627,7 +2626,8 @@ func (bc *BlockChain) InsertHeadersBeforeCutoff(headers []*types.Header) (int, e
if err != nil {
return 0, err
}
if err := bc.db.Sync(); err != nil {
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.SyncAncient(); err != nil {
return 0, err
}
// Write hash to number mappings
Expand Down
8 changes: 4 additions & 4 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1765,7 +1765,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")

pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down Expand Up @@ -1850,7 +1850,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
chain.stopWithoutSaving()

// Start a new blockchain back up and see where the repair leads us
pdb, err = pebble.New(datadir, 0, 0, "", false, true)
pdb, err = pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to reopen persistent key-value database: %v", err)
}
Expand Down Expand Up @@ -1915,7 +1915,7 @@ func testIssue23496(t *testing.T, scheme string) {
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")

pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down Expand Up @@ -1973,7 +1973,7 @@ func testIssue23496(t *testing.T, scheme string) {
chain.stopWithoutSaving()

// Start a new blockchain back up and see where the repair leads us
pdb, err = pebble.New(datadir, 0, 0, "", false, true)
pdb, err = pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to reopen persistent key-value database: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1969,7 +1969,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")

pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")

pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func (snaptest *crashSnapshotTest) test(t *testing.T) {
chain.triedb.Close()

// Start a new blockchain back up and see where the repair leads us
pdb, err := pebble.New(snaptest.datadir, 0, 0, "", false, true)
pdb, err := pebble.New(snaptest.datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2492,7 +2492,7 @@ func testSideImportPrunedBlocks(t *testing.T, scheme string) {
datadir := t.TempDir()
ancient := path.Join(datadir, "ancient")

pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
Expand Down
35 changes: 34 additions & 1 deletion core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,17 +591,50 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat
hashes = append(hashes, hdr.Hash())
}
for _, hash := range hashes {
// Remove the associated block body and receipts if required.
//
// If the block is in the chain freezer, then this delete operation
// is actually ineffective.
if delFn != nil {
delFn(batch, hash, num)
}
// Remove the hash->number mapping along with the header itself
rawdb.DeleteHeader(batch, hash, num)
}
// Remove the number->hash mapping
rawdb.DeleteCanonicalHash(batch, num)
}
}
// Flush all accumulated deletions.
if err := batch.Write(); err != nil {
log.Crit("Failed to rewind block", "error", err)
log.Crit("Failed to commit batch in setHead", "err", err)
}
// Explicitly flush the pending writes in the key-value store to disk, ensuring
// data durability of the previous deletions.
if err := hc.chainDb.SyncKeyValue(); err != nil {
log.Crit("Failed to sync the key-value store in setHead", "err", err)
}
// Truncate the excessive chain segments in the ancient store.
// These are actually deferred deletions from the loop above.
//
// This step must be performed after synchronizing the key-value store;
// otherwise, in the event of a panic, it's theoretically possible to
// lose recent key-value store writes while the ancient store deletions
// remain, leading to data inconsistency, e.g., the gap between the key
// value store and ancient can be created due to unclean shutdown.
if delFn != nil {
// Ignore the error here since light client won't hit this path
frozen, _ := hc.chainDb.Ancients()
header := hc.CurrentHeader()

// Truncate the excessive chain segment above the current chain head
// in the ancient store.
if header.Number.Uint64()+1 < frozen {
_, err := hc.chainDb.TruncateHead(header.Number.Uint64() + 1)
if err != nil {
log.Crit("Failed to truncate head block", "err", err)
}
}
}
// Clear out any stale content from the caches
hc.headerCache.Purge()
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
continue
}
// Batch of blocks have been frozen, flush them before wiping from key-value store
if err := f.Sync(); err != nil {
if err := f.SyncAncient(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err)
}
// Wipe out all data from the active database
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func (db *nofreezedb) TruncateTail(items uint64) (uint64, error) {
return 0, errNotSupported
}

// Sync returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) Sync() error {
// SyncAncient returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) SyncAncient() error {
return errNotSupported
}

Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,8 @@ func (f *Freezer) TruncateTail(tail uint64) (uint64, error) {
return old, nil
}

// Sync flushes all data tables to disk.
func (f *Freezer) Sync() error {
// SyncAncient flushes all data tables to disk.
func (f *Freezer) SyncAncient() error {
var errs []error
for _, table := range f.tables {
if err := table.Sync(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/freezer_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,8 @@ func (f *MemoryFreezer) TruncateTail(tail uint64) (uint64, error) {
return old, nil
}

// Sync flushes all data tables to disk.
func (f *MemoryFreezer) Sync() error {
// SyncAncient flushes all data tables to disk.
func (f *MemoryFreezer) SyncAncient() error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/freezer_resettable.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ func (f *resettableFreezer) TruncateTail(tail uint64) (uint64, error) {
return f.freezer.TruncateTail(tail)
}

// Sync flushes all data tables to disk.
func (f *resettableFreezer) Sync() error {
// SyncAncient flushes all data tables to disk.
func (f *resettableFreezer) SyncAncient() error {
f.lock.RLock()
defer f.lock.RUnlock()

return f.freezer.Sync()
return f.freezer.SyncAncient()
}

// AncientDatadir returns the path of the ancient store.
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/freezer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func TestFreezerCloseSync(t *testing.T) {
if err := f.Close(); err != nil {
t.Fatal(err)
}
if err := f.Sync(); err == nil {
if err := f.SyncAncient(); err == nil {
t.Fatalf("want error, have nil")
} else if have, want := err.Error(), "[closed closed]"; have != want {
t.Fatalf("want %v, have %v", have, want)
Expand Down
12 changes: 9 additions & 3 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ func (t *table) TruncateTail(items uint64) (uint64, error) {
return t.db.TruncateTail(items)
}

// Sync is a noop passthrough that just forwards the request to the underlying
// SyncAncient is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) Sync() error {
return t.db.Sync()
func (t *table) SyncAncient() error {
return t.db.SyncAncient()
}

// AncientDatadir returns the ancient datadir of the underlying database.
Expand Down Expand Up @@ -188,6 +188,12 @@ func (t *table) Compact(start []byte, limit []byte) error {
return t.db.Compact(start, limit)
}

// SyncKeyValue ensures that all pending writes are flushed to disk,
// guaranteeing data durability up to the point.
func (t *table) SyncKeyValue() error {
return t.db.SyncKeyValue()
}

// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called, each operation prefixing all keys with the
// pre-configured string.
Expand Down
14 changes: 11 additions & 3 deletions ethdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ type KeyValueStater interface {
Stat() (string, error)
}

// KeyValueSyncer wraps the SyncKeyValue method of a backing data store.
type KeyValueSyncer interface {
// SyncKeyValue ensures that all pending writes are flushed to disk,
// guaranteeing data durability up to the point.
SyncKeyValue() error
}

// Compacter wraps the Compact method of a backing data store.
type Compacter interface {
// Compact flattens the underlying data store for the given key range. In essence,
Expand All @@ -75,6 +82,7 @@ type KeyValueStore interface {
KeyValueReader
KeyValueWriter
KeyValueStater
KeyValueSyncer
KeyValueRangeDeleter
Batcher
Iteratee
Expand Down Expand Up @@ -126,6 +134,9 @@ type AncientWriter interface {
// The integer return value is the total size of the written data.
ModifyAncients(func(AncientWriteOp) error) (int64, error)

// SyncAncient flushes all in-memory ancient store data to disk.
SyncAncient() error

// TruncateHead discards all but the first n ancient data from the ancient store.
// After the truncation, the latest item can be accessed it item_n-1(start from 0).
TruncateHead(n uint64) (uint64, error)
Expand All @@ -138,9 +149,6 @@ type AncientWriter interface {
//
// Note that data marked as non-prunable will still be retained and remain accessible.
TruncateTail(n uint64) (uint64, error)

// Sync flushes all in-memory ancient store data to disk.
Sync() error
}

// AncientWriteOp is given to the function argument of ModifyAncients.
Expand Down
16 changes: 16 additions & 0 deletions ethdb/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,22 @@ func (db *Database) Path() string {
return db.fn
}

// SyncKeyValue flushes all pending writes in the write-ahead-log to disk,
// ensuring data durability up to that point.
func (db *Database) SyncKeyValue() error {
// In theory, the WAL (Write-Ahead Log) can be explicitly synchronized using
// a write operation with SYNC=true. However, there is no dedicated key reserved
// for this purpose, and even a nil key (key=nil) is considered a valid
// database entry.
//
// In LevelDB, writes are blocked until the data is written to the WAL, meaning
// recent writes won't be lost unless a power failure or system crash occurs.
// Additionally, LevelDB is no longer the default database engine and is likely
// only used by hash-mode archive nodes. Given this, the durability guarantees
// without explicit sync are acceptable in the context of LevelDB.
return nil
}

// meter periodically retrieves internal leveldb counters and reports them to
// the metrics subsystem.
func (db *Database) meter(refresh time.Duration, namespace string) {
Expand Down
6 changes: 6 additions & 0 deletions ethdb/memorydb/memorydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ func (db *Database) Compact(start []byte, limit []byte) error {
return nil
}

// SyncKeyValue ensures that all pending writes are flushed to disk,
// guaranteeing data durability up to the point.
func (db *Database) SyncKeyValue() error {
return nil
}

// Len returns the number of entries currently present in the memory database.
//
// Note, this method is only used for testing (i.e. not public in general) and
Expand Down
Loading