Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stateless witness prefetcher changes #29519

Merged
8 changes: 6 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,8 +1806,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
statedb.SetLogger(bc.logger)

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
// If we are past Byzantium, enable prefetching to pull in trie node paths
// while processing transactions. Before Byzantium the prefetcher is mostly
// useless due to the intermediate root hashing after each transaction.
if bc.chainConfig.IsByzantium(block.Number()) {
statedb.StartPrefetcher("chain")
}
activeState = statedb

// If we have a followup block, run that against the current state to pre-cache
Expand Down
97 changes: 71 additions & 26 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"maps"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -33,6 +34,14 @@ import (
"github.com/holiman/uint256"
)

// hasherPool holds a pool of hashers used by state objects during concurrent
// trie updates.
var hasherPool = sync.Pool{
New: func() interface{} {
return crypto.NewKeccakState()
},
}

type Storage map[common.Hash]common.Hash

func (s Storage) Copy() Storage {
Expand Down Expand Up @@ -118,27 +127,39 @@ func (s *stateObject) touch() {
}
}

// getTrie returns the associated storage trie. The trie will be opened
// if it's not loaded previously. An error will be returned if trie can't
// be loaded.
// getTrie returns the associated storage trie. The trie will be opened if it'
// not loaded previously. An error will be returned if trie can't be loaded.
//
// If a new trie is opened, it will be cached within the state object to allow
// subsequent reads to expand the same trie instead of reloading from disk.
func (s *stateObject) getTrie() (Trie, error) {
if s.trie == nil {
// Try fetching from prefetcher first
if s.data.Root != types.EmptyRootHash && s.db.prefetcher != nil {
// When the miner is creating the pending state, there is no prefetcher
s.trie = s.db.prefetcher.trie(s.addrHash, s.data.Root)
}
if s.trie == nil {
tr, err := s.db.db.OpenStorageTrie(s.db.originalRoot, s.address, s.data.Root, s.db.trie)
if err != nil {
return nil, err
}
s.trie = tr
tr, err := s.db.db.OpenStorageTrie(s.db.originalRoot, s.address, s.data.Root, s.db.trie)
if err != nil {
return nil, err
}
s.trie = tr
}
return s.trie, nil
}

// getPrefetchedTrie returns the associated trie, as populated by the prefetcher
// if it's available.
//
// Note, opposed to getTrie, this method will *NOT* blindly cache the resulting
// trie in the state object. The caller might want to do that, but it's cleaner
// to break the hidden interdependency between retrieving tries from the db or
// from the prefetcher.
func (s *stateObject) getPrefetchedTrie() (Trie, error) {
// If there's nothing to meaningfully return, let the user figure it out by
// pulling the trie from disk.
if s.data.Root == types.EmptyRootHash || s.db.prefetcher == nil {
return nil, nil
}
// Attempt to retrieve the trie from the pretecher
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo pretecher => prefetcher

return s.db.prefetcher.trie(s.addrHash, s.data.Root)
}

// GetState retrieves a value from the account storage trie.
func (s *stateObject) GetState(key common.Hash) common.Hash {
value, _ := s.getState(key)
Expand Down Expand Up @@ -253,7 +274,7 @@ func (s *stateObject) setState(key common.Hash, value *common.Hash) {

// finalise moves all dirty storage slots into the pending area to be hashed or
// committed later. It is invoked at the end of every transaction.
func (s *stateObject) finalise(prefetch bool) {
func (s *stateObject) finalise() {
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
// If the slot is different from its original value, move it into the
Expand All @@ -268,8 +289,10 @@ func (s *stateObject) finalise(prefetch bool) {
delete(s.pendingStorage, key)
}
}
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch)
if s.db.prefetcher != nil && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch); err != nil {
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
log.Error("Failed to prefetch slots", "addr", s.address, "slots", len(slotsToPrefetch), "err", err)
}
}
if len(s.dirtyStorage) > 0 {
s.dirtyStorage = make(Storage)
Expand All @@ -288,25 +311,43 @@ func (s *stateObject) finalise(prefetch bool) {
// storage change at all.
func (s *stateObject) updateTrie() (Trie, error) {
// Make sure all dirty slots are finalized into the pending storage area
s.finalise(false)
s.finalise()

// Short circuit if nothing changed, don't bother with hashing anything
if len(s.pendingStorage) == 0 {
return s.trie, nil
}
// Retrieve a pretecher populated trie, or fall back to the database
tr, err := s.getPrefetchedTrie()
switch {
case err != nil:
// Fetcher retrieval failed, something's very wrong, abort
s.db.setError(err)
return nil, err

case tr == nil:
// Fetcher not running or empty trie, fallback to the database trie
tr, err = s.getTrie()
if err != nil {
s.db.setError(err)
return nil, err
}

default:
// Prefetcher returned a live trie, swap it out for the current one
s.trie = tr
}
// The snapshot storage map for the object
var (
storage map[common.Hash][]byte
origin map[common.Hash][]byte
)
tr, err := s.getTrie()
if err != nil {
s.db.setError(err)
return nil, err
}
// Insert all the pending storage updates into the trie
usedStorage := make([][]byte, 0, len(s.pendingStorage))

hasher := hasherPool.Get().(crypto.KeccakState)
defer hasherPool.Put(hasher)

// Perform trie updates before deletions. This prevents resolution of unnecessary trie nodes
// in circumstances similar to the following:
//
Expand Down Expand Up @@ -335,26 +376,30 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageUpdated += 1
s.db.StorageUpdated.Add(1)
} else {
deletions = append(deletions, key)
}
// Cache the mutated storage slots until commit
if storage == nil {
s.db.storagesLock.Lock()
if storage = s.db.storages[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte)
s.db.storages[s.addrHash] = storage
}
s.db.storagesLock.Unlock()
}
khash := crypto.HashData(s.db.hasher, key[:])
khash := crypto.HashData(hasher, key[:])
storage[khash] = encoded // encoded will be nil if it's deleted

// Cache the original value of mutated storage slots
if origin == nil {
s.db.storagesLock.Lock()
if origin = s.db.storagesOrigin[s.address]; origin == nil {
origin = make(map[common.Hash][]byte)
s.db.storagesOrigin[s.address] = origin
}
s.db.storagesLock.Unlock()
}
// Track the original value of slot only if it's mutated first time
if _, ok := origin[khash]; !ok {
Expand All @@ -374,7 +419,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageDeleted += 1
s.db.StorageDeleted.Add(1)
}
// If no slots were touched, issue a warning as we shouldn't have done all
// the above work in the first place
Expand Down