Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: versiondb don't support restore from local snapshot #1108

Merged
merged 8 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- [#1083](/~https://github.com/crypto-org-chain/cronos/pull/1083) memiavl support both sdk 46 and 47 root hash rules.
- [#1091](/~https://github.com/crypto-org-chain/cronos/pull/1091) memiavl support rollback.
- [#1100](/~https://github.com/crypto-org-chain/cronos/pull/1100) memiavl support read-only mode, and grab exclusive lock for write mode.
- [#]() versiondb support restore from local snapshot.
yihuang marked this conversation as resolved.
Show resolved Hide resolved

### Improvements

Expand Down
1 change: 1 addition & 0 deletions versiondb/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func ChangeSetGroupCmd(opts Options) *cobra.Command {
IngestVersionDBSSTCmd(),
ChangeSetToVersionDBCmd(),
RestoreAppDBCmd(opts),
RestoreVersionDBCmd(),
)
return cmd
}
121 changes: 121 additions & 0 deletions versiondb/client/restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package client

import (
"fmt"
"io"
"math"
"path/filepath"
"strconv"

"cosmossdk.io/errors"
protoio "github.com/gogo/protobuf/io"
"github.com/spf13/cobra"

"github.com/cosmos/cosmos-sdk/server"
"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"

"github.com/crypto-org-chain/cronos/versiondb"
"github.com/crypto-org-chain/cronos/versiondb/tsrocksdb"
)

// RestoreVersionDBCmd returns a command to restore a versiondb from local snapshot
func RestoreVersionDBCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "restore-versiondb <height> <format>",
Short: "Restore initial versiondb from local snapshot",
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := server.GetServerContextFromCmd(cmd)

height, err := strconv.ParseUint(args[0], 10, 63)
mmsqe marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
format, err := strconv.ParseUint(args[1], 10, 32)
if err != nil {
return err
}

store, err := server.GetSnapshotStore(ctx.Viper)
if err != nil {
return err
}

snapshot, chChunks, err := store.Load(height, uint32(format))

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
if err != nil {
return err
}

if snapshot == nil {
return fmt.Errorf("snapshot doesn't exist, height: %d, format: %d", height, format)
}

streamReader, err := snapshots.NewStreamReader(chChunks)
if err != nil {
return err
}
defer streamReader.Close()

home := ctx.Config.RootDir
versionDB, err := tsrocksdb.NewStore(filepath.Join(home, "data", "versiondb"))
if err != nil {
return err
}

ch := make(chan versiondb.ImportEntry, 128)

go func() {
defer close(ch)

if err := readSnapshotEntries(streamReader, ch); err != nil {
ctx.Logger.Error("failed to read snapshot entries", "err", err)
}
}()

return versionDB.Import(int64(height), ch)

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
},
}
return cmd
}

// readSnapshotEntries reads key-value entries from protobuf reader and feed to the channel
func readSnapshotEntries(protoReader protoio.Reader, ch chan<- versiondb.ImportEntry) error {
var (
snapshotItem snapshottypes.SnapshotItem
storeKey string
)

loop:
for {
snapshotItem = snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(&snapshotItem)
if err == io.EOF {
break
} else if err != nil {
return errors.Wrap(err, "invalid protobuf message")
}

switch item := snapshotItem.Item.(type) {
case *snapshottypes.SnapshotItem_Store:
storeKey = item.Store.Name
case *snapshottypes.SnapshotItem_IAVL:
if storeKey == "" {
return errors.Wrap(err, "invalid protobuf message, store name is empty")
}
if item.IAVL.Height > math.MaxInt8 {
return fmt.Errorf("node height %v cannot exceed %v",
item.IAVL.Height, math.MaxInt8)
}
ch <- versiondb.ImportEntry{
StoreKey: storeKey,
Key: item.IAVL.Key,
Value: item.IAVL.Value,
}
default:
break loop
}
}

return nil
}
33 changes: 33 additions & 0 deletions versiondb/tsrocksdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (

StorePrefixTpl = "s/k:%s/"
latestVersionKey = "s/latest"

ImportCommitBatchSize = 10000
)

var (
Expand Down Expand Up @@ -167,6 +169,37 @@ func (s Store) FeedChangeSet(version int64, store string, changeSet *iavl.Change
return s.db.Write(defaultWriteOpts, batch)
}

// Import loads the initial version of the state
func (s Store) Import(version int64, ch <-chan versiondb.ImportEntry) error {
batch := grocksdb.NewWriteBatch()
defer batch.Destroy()

var ts [TimestampSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion

var counter int
for entry := range ch {
key := cloneAppend(storePrefix(entry.StoreKey), entry.Key)
batch.PutCFWithTS(s.cfHandle, key, ts[:], entry.Value)

counter++
if counter%ImportCommitBatchSize == 0 {
if err := s.db.Write(defaultWriteOpts, batch); err != nil {
return err
}
batch.Clear()
}
}

if batch.Count() > 0 {
if err := s.db.Write(defaultWriteOpts, batch); err != nil {
return err
}
}

return s.SetLatestVersion(version)
}

func newTSReadOptions(version *int64) *grocksdb.ReadOptions {
var ver uint64
if version == nil {
Expand Down
9 changes: 9 additions & 0 deletions versiondb/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,13 @@ type VersionStore interface {
// the `changeSet` should be ordered by (storeKey, key),
// the version should be latest version plus one.
PutAtVersion(version int64, changeSet []types.StoreKVPair) error

// Import the initial state of the store
Import(version int64, ch <-chan ImportEntry) error
}

type ImportEntry struct {
StoreKey string
Key []byte
Value []byte
}