Skip to content

Commit

Permalink
feat: iavl async pruning (#593)
Browse files Browse the repository at this point in the history
* iavl async pruning

* go mod tidy

---------

Co-authored-by: Adam Tucker <[email protected]>
  • Loading branch information
cool-develope and czarcas7ic committed May 9, 2024
1 parent 0609929 commit e08f864
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 11 deletions.
8 changes: 8 additions & 0 deletions server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (ms multiStore) Commit() storetypes.CommitID {
panic("not implemented")
}

func (ms multiStore) SetCommitting() {
panic("not implemented")
}

func (ms multiStore) UnsetCommitting() {
panic("not implemented")
}

func (ms multiStore) LastCommitID() storetypes.CommitID {
panic("not implemented")
}
Expand Down
13 changes: 12 additions & 1 deletion store/iavl/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func LoadStore(db dbm.DB, logger log.Logger, key types.StoreKey, id types.Commit
// provided DB. An error is returned if the version fails to load, or if called with a positive
// version on an empty tree.
func LoadStoreWithInitialVersion(db dbm.DB, logger log.Logger, key types.StoreKey, id types.CommitID, initialVersion uint64, cacheSize int, disableFastNode bool, metrics metrics.StoreMetrics) (types.CommitKVStore, error) {
tree := iavl.NewMutableTree(wrapper.NewDBWrapper(db), cacheSize, disableFastNode, logger, iavl.InitialVersionOption(initialVersion))
tree := iavl.NewMutableTree(wrapper.NewDBWrapper(db), cacheSize, disableFastNode, logger, iavl.InitialVersionOption(initialVersion), iavl.AsyncPruningOption(true))

isUpgradeable, err := tree.IsUpgradeable()
if err != nil {
Expand Down Expand Up @@ -118,6 +118,17 @@ func (st *Store) GetImmutable(version int64) (*Store, error) {
}, nil
}

// SetCommitting marks the store as committing, which will prevent any
// parallel writes to the store. It is referenced in the async pruning.
func (st *Store) SetCommitting() {
st.tree.SetCommitting()
}

// UnsetCommitting marks the store as not committing.
func (st *Store) UnsetCommitting() {
st.tree.UnsetCommitting()
}

// Commit commits the current store state and returns a CommitID with the new
// version and hash.
func (st *Store) Commit() types.CommitID {
Expand Down
10 changes: 10 additions & 0 deletions store/iavl/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type (
Get(key []byte) ([]byte, error)
Set(key, value []byte) (bool, error)
Remove(key []byte) ([]byte, bool, error)
SetCommitting()
UnsetCommitting()
SaveVersion() ([]byte, int64, error)
Version() int64
Hash() []byte
Expand Down Expand Up @@ -53,6 +55,14 @@ func (it *immutableTree) Remove(_ []byte) ([]byte, bool, error) {
panic("cannot call 'Remove' on an immutable IAVL tree")
}

func (it *immutableTree) SetCommitting() {
panic("cannot call 'SetCommitting' on an immutable IAVL tree")
}

func (it *immutableTree) UnsetCommitting() {
panic("cannot call 'UnsetCommitting' on an immutable IAVL tree")
}

func (it *immutableTree) SaveVersion() ([]byte, int64, error) {
panic("cannot call 'SaveVersion' on an immutable IAVL tree")
}
Expand Down
6 changes: 6 additions & 0 deletions store/mem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func (s Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Cach
// Commit performs a no-op as entries are persistent between commitments.
func (s *Store) Commit() (id types.CommitID) { return }

// Implements CommitStore.
func (s *Store) SetCommitting() {}

// Implements CommitStore.
func (s *Store) UnsetCommitting() {}

func (s *Store) SetPruning(pruning pruningtypes.PruningOptions) {}

// GetPruning is a no-op as pruning options cannot be directly set on this store.
Expand Down
4 changes: 4 additions & 0 deletions store/rootmulti/dbadapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (cdsa commitDBStoreAdapter) WorkingHash() []byte {
return commithash
}

func (cdsa commitDBStoreAdapter) SetCommitting() {}

func (cdsa commitDBStoreAdapter) UnsetCommitting() {}

func (cdsa commitDBStoreAdapter) SetPruning(_ pruningtypes.PruningOptions) {}

// GetPruning is a no-op as pruning options cannot be directly set on this store.
Expand Down
16 changes: 16 additions & 0 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,9 @@ func (rs *Store) Commit() types.CommitID {
rs.logger.Debug("commit header and version mismatch", "header_height", rs.commitHeader.Height, "version", version)
}

rs.SetCommitting()
rs.lastCommitInfo = commitStores(version, rs.stores, rs.removalMap)
rs.UnsetCommitting()
rs.lastCommitInfo.Timestamp = rs.commitHeader.Time
defer rs.flushMetadata(rs.db, version, rs.lastCommitInfo)

Expand Down Expand Up @@ -547,6 +549,20 @@ func (rs *Store) WorkingHash() []byte {
return types.CommitInfo{StoreInfos: storeInfos}.Hash()
}

// SetCommitting implements Committer/CommitStore.
func (rs *Store) SetCommitting() {
for _, store := range rs.stores {
store.SetCommitting()
}
}

// UnsetCommitting implements Committer/CommitStore.
func (rs *Store) UnsetCommitting() {
for _, store := range rs.stores {
store.UnsetCommitting()
}
}

// CacheWrap implements CacheWrapper/Store/CommitStore.
func (rs *Store) CacheWrap() types.CacheWrap {
return rs.CacheMultiStore().(types.CacheWrap)
Expand Down
27 changes: 17 additions & 10 deletions store/rootmulti/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,10 @@ func TestMultiStore_Pruning(t *testing.T) {
ms.Commit()
}

// asyn pruning, simulate the consensus process
time.Sleep(150 * time.Millisecond)
ms.Commit()

for _, v := range tc.saved {
_, err := ms.CacheMultiStoreWithVersion(v)
require.NoError(t, err, "expected no error when loading height: %d", v)
Expand Down Expand Up @@ -563,16 +567,6 @@ func TestMultiStore_Pruning_SameHeightsTwice(t *testing.T) {

require.Equal(t, numVersions, lastCommitInfo.Version)

for v := int64(1); v < numVersions-int64(keepRecent); v++ {
err := ms.LoadVersion(v)
require.Error(t, err, "expected error when loading pruned height: %d", v)
}

for v := (numVersions - int64(keepRecent)); v < numVersions; v++ {
err := ms.LoadVersion(v)
require.NoError(t, err, "expected no error when loading height: %d", v)
}

// Get latest
err := ms.LoadVersion(numVersions - 1)
require.NoError(t, err)
Expand All @@ -585,8 +579,19 @@ func TestMultiStore_Pruning_SameHeightsTwice(t *testing.T) {
require.Equal(t, numVersions, lastCommitInfo.Version)

// Ensure that can commit one more height with no panic
time.Sleep(150 * time.Millisecond)
lastCommitInfo = ms.Commit()
require.Equal(t, numVersions+1, lastCommitInfo.Version)

for v := int64(1); v < numVersions-int64(keepRecent); v++ {
err := ms.LoadVersion(v)
require.Error(t, err, "expected error when loading pruned height: %d", v)
}

for v := int64(numVersions - int64(keepRecent)); v < numVersions; v++ {
err := ms.LoadVersion(v)
require.NoError(t, err, "expected no error when loading height: %d", v)
}
}

func TestMultiStore_PruningRestart(t *testing.T) {
Expand All @@ -613,6 +618,8 @@ func TestMultiStore_PruningRestart(t *testing.T) {

// commit one more block and ensure the heights have been pruned
ms.Commit()
time.Sleep(150 * time.Millisecond)
ms.Commit()

actualHeightToPrune = ms.pruningManager.GetPruningHeight(ms.LatestVersion())
require.Equal(t, int64(8), actualHeightToPrune)
Expand Down
6 changes: 6 additions & 0 deletions store/transient/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ func (ts *Store) WorkingHash() []byte {
return []byte{}
}

// Implements CommitStore
func (ts *Store) SetCommitting() {}

// Implements CommitStore
func (ts *Store) UnsetCommitting() {}

// Implements Store.
func (ts *Store) GetStoreType() types.StoreType {
return types.StoreTypeTransient
Expand Down
2 changes: 2 additions & 0 deletions store/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Store interface {
type Committer interface {
Commit() CommitID
LastCommitID() CommitID
SetCommitting()
UnsetCommitting()

// WorkingHash returns the hash of the KVStore's state before commit.
WorkingHash() []byte
Expand Down

0 comments on commit e08f864

Please sign in to comment.