Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: memiavl snapshot rewriting is not triggered #1034

Merged
merged 18 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
- [#1028](/~https://github.com/crypto-org-chain/cronos/pull/1028) Add memiavl configs into app.toml
- [#1027](/~https://github.com/crypto-org-chain/cronos/pull/1027) Integrate local state-sync commands.
- [#1029](/~https://github.com/crypto-org-chain/cronos/pull/1029) Change config `async-commit` to `async-commit-buffer`, make the channel size configurable.
- [#]() Support memiavl snapshot strategy configuration.
yihuang marked this conversation as resolved.
Show resolved Hide resolved

*April 13, 2023*

Expand Down
10 changes: 8 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ import (

// this line is used by starport scaffolding # stargate/app/moduleImport

memiavlrootmulti "github.com/crypto-org-chain/cronos/store/rootmulti"
"github.com/crypto-org-chain/cronos/v2/x/cronos"
cronosclient "github.com/crypto-org-chain/cronos/v2/x/cronos/client"
cronoskeeper "github.com/crypto-org-chain/cronos/v2/x/cronos/keeper"
Expand Down Expand Up @@ -995,6 +996,11 @@ func VerifyAddressFormat(bz []byte) error {
return nil
}

func setCMS(cms storetypes.CommitMultiStore) func(*baseapp.BaseApp) {
return func(bapp *baseapp.BaseApp) { bapp.SetCMS(cms) }
// Close will be called in graceful shutdown in start cmd
func (app *App) Close() error {
if cms, ok := app.CommitMultiStore().(*memiavlrootmulti.Store); ok {
return cms.WaitAsyncCommit()
}

return nil
}
4 changes: 4 additions & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type MemIAVLConfig struct {
// AsyncCommitBuffer defines the size of asynchronous commit queue, this greatly improve block catching-up
// performance, -1 means synchronous commit.
AsyncCommitBuffer int `mapstructure:"async-commit-buffer"`
// SnapshotKeepRecent defines what old snapshots to keep after new snapshots are taken.
SnapshotKeepRecent uint32 `mapstructure:"snapshot-keep-recent"`
// SnapshotInterval defines the block interval the memiavl snapshot is taken, default to 1000.
SnapshotInterval uint32 `mapstructure:"snapshot-interval"`
}

func DefaultMemIAVLConfig() MemIAVLConfig {
Expand Down
6 changes: 6 additions & 0 deletions app/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ zero-copy = {{ .MemIAVL.ZeroCopy }}
# AsyncCommitBuffer defines the size of asynchronous commit queue, this greatly improve block catching-up
# performance, -1 means synchronous commit.
async-commit-buffer = {{ .MemIAVL.AsyncCommitBuffer }}

# SnapshotKeepRecent defines what old snapshots to keep after new snapshots are taken.
snapshot-keep-recent = {{ .MemIAVL.SnapshotKeepRecent }}

# SnapshotInterval defines the block interval the memiavl snapshot is taken, default to 1000.
snapshot-interval = {{ .MemIAVL.SnapshotInterval }}
`
22 changes: 17 additions & 5 deletions app/memiavl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,37 @@ import (

"github.com/cosmos/cosmos-sdk/baseapp"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
storetypes "github.com/cosmos/cosmos-sdk/store/types"

"github.com/crypto-org-chain/cronos/memiavl"
"github.com/crypto-org-chain/cronos/store/rootmulti"
)

const (
FlagMemIAVL = "memiavl.enable"
FlagAsyncCommitBuffer = "memiavl.async-commit-buffer"
FlagZeroCopy = "memiavl.zero-copy"
FlagMemIAVL = "memiavl.enable"
FlagAsyncCommitBuffer = "memiavl.async-commit-buffer"
FlagZeroCopy = "memiavl.zero-copy"
FlagSnapshotKeepRecent = "memiavl.snapshot-keep-recent"
FlagSnapshotInterval = "memiavl.snapshot-interval"
)

func SetupMemIAVL(logger log.Logger, homePath string, appOpts servertypes.AppOptions, baseAppOptions []func(*baseapp.BaseApp)) []func(*baseapp.BaseApp) {
if cast.ToBool(appOpts.Get(FlagMemIAVL)) {
// cms must be overridden before the other options, because they may use the cms,
// make sure the cms aren't be overridden by the other options later on.
cms := rootmulti.NewStore(filepath.Join(homePath, "data", "memiavl.db"), logger)
cms.SetAsyncCommitBuffer(cast.ToInt(appOpts.Get(FlagAsyncCommitBuffer)))
cms.SetZeroCopy(cast.ToBool(appOpts.Get(FlagZeroCopy)))
cms.SetMemIAVLOptions(memiavl.Options{
AsyncCommitBuffer: cast.ToInt(appOpts.Get(FlagAsyncCommitBuffer)),
ZeroCopy: cast.ToBool(appOpts.Get(FlagZeroCopy)),
SnapshotKeepRecent: cast.ToUint32(appOpts.Get(FlagSnapshotKeepRecent)),
SnapshotInterval: cast.ToUint32(appOpts.Get(FlagSnapshotInterval)),
})
baseAppOptions = append([]func(*baseapp.BaseApp){setCMS(cms)}, baseAppOptions...)
}

return baseAppOptions
}

func setCMS(cms storetypes.CommitMultiStore) func(*baseapp.BaseApp) {
return func(bapp *baseapp.BaseApp) { bapp.SetCMS(cms) }
}
1 change: 1 addition & 0 deletions integration_tests/configs/default.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
memiavl: {
enable: true,
'zero-copy': true,
'snapshot-interval': 5,
mmsqe marked this conversation as resolved.
Show resolved Hide resolved
},
store: {
streamers: ['versiondb'],
Expand Down
71 changes: 57 additions & 14 deletions memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import (
"sync"

storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tidwall/wal"
)

const DefaultSnapshotInterval = 1000

// DB implements DB-like functionalities on top of MultiTree:
// - async snapshot rewriting
// - Write-ahead-log
Expand All @@ -31,10 +34,12 @@ import (
// ```
type DB struct {
MultiTree
dir string
dir string
logger log.Logger

snapshotRewriteChan chan snapshotResult
snapshotKeepRecent uint32
snapshotInterval uint32
pruneSnapshotLock sync.Mutex

// invariant: the LastIndex always match the current version of MultiTree
Expand All @@ -57,14 +62,17 @@ type DB struct {
}

type Options struct {
Logger log.Logger
CreateIfMissing bool
InitialVersion uint32
// the initial stores when initialize the empty instance
InitialStores []string
SnapshotKeepRecent uint32
SnapshotInterval uint32
// load the target version instead of latest version
TargetVersion uint32
// Buffer size for the asynchronous commit queue, -1 means synchronous commit
// Buffer size for the asynchronous commit queue, -1 means synchronous commit,
// default to 0.
AsyncCommitBuffer int
// ZeroCopy if true, the get and iterator methods could return a slice pointing to mmaped blob files.
ZeroCopy bool
Expand Down Expand Up @@ -100,10 +108,20 @@ func Load(dir string, opts Options) (*DB, error) {

db := &DB{
MultiTree: *mtree,
logger: opts.Logger,
dir: dir,
wal: wal,
walChanSize: opts.AsyncCommitBuffer,
snapshotKeepRecent: opts.SnapshotKeepRecent,
snapshotInterval: opts.SnapshotInterval,
}

if db.logger == nil {
db.logger = log.NewNopLogger()
}

if db.snapshotInterval == 0 {
db.snapshotInterval = DefaultSnapshotInterval
}

if db.Version() == 0 && len(opts.InitialStores) > 0 {
Expand Down Expand Up @@ -193,28 +211,40 @@ func (db *DB) checkBackgroundSnapshotRewrite() error {
}
// prune the old snapshots
// wait until last prune finish
newSnapshotVersion := result.mtree.metadata.CommitInfo.Version
db.pruneSnapshotLock.Lock()
go func() {
defer db.pruneSnapshotLock.Unlock()

entries, err := os.ReadDir(db.dir)
if err == nil {
for _, entry := range entries {
if entry.IsDir() && strings.HasPrefix(entry.Name(), SnapshotPrefix) {
currentVersion, err := strconv.ParseInt(strings.TrimPrefix(entry.Name(), SnapshotPrefix), 10, 32)
if err != nil {
fmt.Printf("failed when parse current version: %s\n", err)
continue
if err != nil {
db.logger.Error("failed to read db dir", "err", err)
return
}

var prunedVersion int64
for _, entry := range entries {
if entry.IsDir() && strings.HasPrefix(entry.Name(), SnapshotPrefix) {
snapshotVersion, err := strconv.ParseInt(strings.TrimPrefix(entry.Name(), SnapshotPrefix), 10, 32)
if err != nil {
db.logger.Error("failed when parse snapshot file name", "err", err)
continue
}
if snapshotVersion < newSnapshotVersion-int64(db.snapshotKeepRecent) {
Fixed Show fixed Hide fixed
fullPath := filepath.Join(db.dir, entry.Name())
if err := os.RemoveAll(fullPath); err != nil {
db.logger.Error("failed when remove old snapshot", "err", err)
}
if result.mtree.metadata.CommitInfo.Version-currentVersion > int64(db.snapshotKeepRecent) {
fullPath := filepath.Join(db.dir, entry.Name())
if err := os.RemoveAll(fullPath); err != nil {
fmt.Printf("failed when remove old snapshot: %s\n", err)
}
if snapshotVersion > prunedVersion {
prunedVersion = snapshotVersion
}
}
}
}

if err := db.wal.TruncateFront(uint64(newSnapshotVersion + 1)); err != nil {
Fixed Show fixed Hide fixed
db.logger.Error("failed to truncate wal", "err", err, "version", prunedVersion)
}
}()
return nil

Expand Down Expand Up @@ -265,6 +295,8 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {

db.pendingUpgrades = db.pendingUpgrades[:0]

db.rewriteIfApplicable(v)

return hash, v, nil
}

Expand Down Expand Up @@ -372,6 +404,17 @@ func (db *DB) reloadMultiTree(mtree *MultiTree) error {
return nil
}

// rewriteIfApplicable execute the snapshot rewrite strategy according to current height
func (db *DB) rewriteIfApplicable(height int64) {
if height%int64(db.snapshotInterval) != 0 {
Fixed Show fixed Hide fixed

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
return
}

if err := db.RewriteSnapshotBackground(); err != nil {
db.logger.Error("failed to rewrite snapshot in background", "err", err)
}
}

type snapshotResult struct {
mtree *MultiTree
err error
Expand Down
3 changes: 2 additions & 1 deletion memiavl/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func (t *Tree) SetInitialVersion(initialVersion int64) error {
return nil
}

// Copy returns a snapshot of the tree which won't be corrupted by further modifications on the main tree.
// Copy returns a snapshot of the tree which won't be modified by further modifications on the main tree,
// the returned new tree can be accessed concurrently with the main tree.
func (t *Tree) Copy() *Tree {
if _, ok := t.root.(*MemNode); ok {
// protect the existing `MemNode`s from get modified in-place
Expand Down
24 changes: 9 additions & 15 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ type Store struct {

interBlockCache types.MultiStorePersistentCache

asyncCommitBuffer int
zeroCopy bool
opts memiavl.Options
}

func NewStore(dir string, logger log.Logger) *Store {
Expand Down Expand Up @@ -269,13 +268,12 @@ func (rs *Store) LoadVersionAndUpgrade(version int64, upgrades *types.StoreUpgra
initialStores = append(initialStores, key.Name())
}
}
db, err := memiavl.Load(rs.dir, memiavl.Options{
CreateIfMissing: true,
InitialStores: initialStores,
TargetVersion: uint32(version),
AsyncCommitBuffer: rs.asyncCommitBuffer,
ZeroCopy: rs.zeroCopy,
})

opts := rs.opts
opts.CreateIfMissing = true
opts.InitialStores = initialStores
opts.TargetVersion = uint32(version)

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
db, err := memiavl.Load(rs.dir, opts)
if err != nil {
return errors.Wrapf(err, "fail to load memiavl at %s", rs.dir)
}
Expand Down Expand Up @@ -388,12 +386,8 @@ func (rs *Store) SetIAVLDisableFastNode(disable bool) {
func (rs *Store) SetLazyLoading(lazyLoading bool) {
}

func (rs *Store) SetAsyncCommitBuffer(size int) {
rs.asyncCommitBuffer = size
}

func (rs *Store) SetZeroCopy(zeroCopy bool) {
rs.zeroCopy = zeroCopy
func (rs *Store) SetMemIAVLOptions(opts memiavl.Options) {
rs.opts = opts
}

// Implements interface CommitMultiStore
Expand Down