Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: versiondb don't support restore from local snapshot #1108

Merged
merged 8 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- [#1083](/~https://github.com/crypto-org-chain/cronos/pull/1083) memiavl support both sdk 46 and 47 root hash rules.
- [#1091](/~https://github.com/crypto-org-chain/cronos/pull/1091) memiavl support rollback.
- [#1100](/~https://github.com/crypto-org-chain/cronos/pull/1100) memiavl support read-only mode, and grab exclusive lock for write mode.
- [#1108](/~https://github.com/crypto-org-chain/cronos/pull/1108) versiondb support restore from local snapshot.

### Improvements

Expand Down
11 changes: 9 additions & 2 deletions integration_tests/cosmoscli.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,20 @@ def distribution_commission(self, addr):
)["commission"][0]
return float(coin["amount"])

def distribution_community(self):
def distribution_community(self, **kwargs):
coin = json.loads(
self.raw(
"query",
"distribution",
"community-pool",
output="json",
node=self.node_rpc,
**kwargs,
)
)["pool"][0]
return float(coin["amount"])

def distribution_reward(self, delegator_addr):
def distribution_reward(self, delegator_addr, **kwargs):
coin = json.loads(
self.raw(
"query",
Expand All @@ -240,6 +241,7 @@ def distribution_reward(self, delegator_addr):
delegator_addr,
output="json",
node=self.node_rpc,
**kwargs,
)
)["total"][0]
return float(coin["amount"])
Expand Down Expand Up @@ -1617,6 +1619,11 @@ def changeset_ingest_versiondb_sst(self, versiondb_dir, sst_dir, **kwargs):
**kwargs,
).decode()

def restore_versiondb(self, height, format=2):
return self.raw(
"changeset", "restore-versiondb", height, format, home=self.data_dir
)

def dump_snapshot(self, height, tarball, format=2):
return self.raw(
"snapshots", "dump", height, format, home=self.data_dir, output=tarball
Expand Down
129 changes: 73 additions & 56 deletions integration_tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,14 @@ def test_statesync(cronos):
clustercli.supervisor.stopProcess(f"{clustercli.chain_id}-node{i}")


def test_local_statesync(cronos):
def test_local_statesync(cronos, tmp_path_factory):
"""
- init a new node
- init a new node, enable versiondb
- dump snapshot on node0
- load snapshot to the new node
- restore the new node state from the snapshot
- bootstrap cometbft state
- restore the versiondb from the snapshot
- startup the node, should sync
- cleanup
"""
Expand All @@ -277,63 +278,79 @@ def test_local_statesync(cronos):
cronos.supervisorctl("start", "cronos_777-1-node0")
wait_for_port(ports.evmrpc_port(cronos.base_port(0)))

with tempfile.TemporaryDirectory() as home:
print("home", home)

i = len(cronos.config["validators"])
base_port = 26650 + i * 10
node_rpc = "tcp://127.0.0.1:%d" % ports.rpc_port(base_port)
cli = CosmosCLI.init(
"local_statesync",
Path(home),
node_rpc,
cronos.chain_binary,
"cronos_777-1",
)
home = tmp_path_factory.mktemp("local_statesync")
print("home", home)

i = len(cronos.config["validators"])
base_port = 26650 + i * 10
node_rpc = "tcp://127.0.0.1:%d" % ports.rpc_port(base_port)
cli = CosmosCLI.init(
"local_statesync",
Path(home),
node_rpc,
cronos.chain_binary,
"cronos_777-1",
)

# init the configs
peers = ",".join(
[
"tcp://%s@%s:%d"
% (
cronos.cosmos_cli(i).node_id(),
val["hostname"],
ports.p2p_port(val["base_port"]),
)
for i, val in enumerate(cronos.config["validators"])
]
)
rpc_servers = ",".join(cronos.node_rpc(i) for i in range(2))
trust_height = int(sync_info["latest_block_height"])
trust_hash = sync_info["latest_block_hash"]

cluster.edit_tm_cfg(
Path(home) / "config/config.toml",
base_port,
peers,
{
"statesync": {
"rpc_servers": rpc_servers,
"trust_height": trust_height,
"trust_hash": trust_hash,
},
# init the configs
peers = ",".join(
[
"tcp://%s@%s:%d"
% (
cronos.cosmos_cli(i).node_id(),
val["hostname"],
ports.p2p_port(val["base_port"]),
)
for i, val in enumerate(cronos.config["validators"])
]
)
rpc_servers = ",".join(cronos.node_rpc(i) for i in range(2))
trust_height = int(sync_info["latest_block_height"])
trust_hash = sync_info["latest_block_hash"]

cluster.edit_tm_cfg(
Path(home) / "config/config.toml",
base_port,
peers,
{
"statesync": {
"rpc_servers": rpc_servers,
"trust_height": trust_height,
"trust_hash": trust_hash,
},
)
},
)
cluster.edit_app_cfg(
Path(home) / "config/app.toml",
base_port,
{
"store": {
"streamers": ["versiondb"],
},
},
)

# restore the states
cli.load_snapshot(tarball)
print(cli.list_snapshot())
cli.restore_snapshot(height)
cli.bootstrap_state()

with subprocess.Popen(
[cronos.chain_binary, "start", "--home", home],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
):
wait_for_port(ports.rpc_port(base_port))
# check the node sync normally
wait_for_new_blocks(cli, 2)
# restore the states
cli.load_snapshot(tarball)
print(cli.list_snapshot())
cli.restore_snapshot(height)
cli.bootstrap_state()
cli.restore_versiondb(height)

with subprocess.Popen(
[cronos.chain_binary, "start", "--home", home],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
):
wait_for_port(ports.rpc_port(base_port))
# check the node sync normally
wait_for_new_blocks(cli, 2)
# check grpc works
print("distribution", cli.distribution_community(height=height))
with pytest.raises(Exception) as exc_info:
cli.distribution_community(height=height - 1)

assert "Stored fee pool should not have been nil" in exc_info.value.args[0]


def test_transaction(cronos):
Expand Down
1 change: 1 addition & 0 deletions versiondb/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func ChangeSetGroupCmd(opts Options) *cobra.Command {
IngestVersionDBSSTCmd(),
ChangeSetToVersionDBCmd(),
RestoreAppDBCmd(opts),
RestoreVersionDBCmd(),
)
return cmd
}
121 changes: 121 additions & 0 deletions versiondb/client/restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package client

import (
"fmt"
"io"
"math"
"path/filepath"
"strconv"

"cosmossdk.io/errors"
protoio "github.com/gogo/protobuf/io"
"github.com/spf13/cobra"

"github.com/cosmos/cosmos-sdk/server"
"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"

"github.com/crypto-org-chain/cronos/versiondb"
"github.com/crypto-org-chain/cronos/versiondb/tsrocksdb"
)

// RestoreVersionDBCmd returns a command to restore a versiondb from local snapshot
func RestoreVersionDBCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "restore-versiondb <height> <format>",
Short: "Restore initial versiondb from local snapshot",
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := server.GetServerContextFromCmd(cmd)

height, err := strconv.ParseUint(args[0], 10, 63)
mmsqe marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
format, err := strconv.ParseUint(args[1], 10, 32)
if err != nil {
return err
}

store, err := server.GetSnapshotStore(ctx.Viper)
if err != nil {
return err
}

snapshot, chChunks, err := store.Load(height, uint32(format))

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
if err != nil {
return err
}

if snapshot == nil {
return fmt.Errorf("snapshot doesn't exist, height: %d, format: %d", height, format)
}

streamReader, err := snapshots.NewStreamReader(chChunks)
if err != nil {
return err
}
defer streamReader.Close()

home := ctx.Config.RootDir
versionDB, err := tsrocksdb.NewStore(filepath.Join(home, "data", "versiondb"))
if err != nil {
return err
}

ch := make(chan versiondb.ImportEntry, 128)

go func() {
defer close(ch)

if err := readSnapshotEntries(streamReader, ch); err != nil {
ctx.Logger.Error("failed to read snapshot entries", "err", err)
}
}()

return versionDB.Import(int64(height), ch)

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
},
}
return cmd
}

// readSnapshotEntries reads key-value entries from protobuf reader and feed to the channel
func readSnapshotEntries(protoReader protoio.Reader, ch chan<- versiondb.ImportEntry) error {
var (
snapshotItem snapshottypes.SnapshotItem
storeKey string
)

loop:
for {
snapshotItem = snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(&snapshotItem)
if err == io.EOF {
break
} else if err != nil {
return errors.Wrap(err, "invalid protobuf message")
}

switch item := snapshotItem.Item.(type) {
case *snapshottypes.SnapshotItem_Store:
storeKey = item.Store.Name
case *snapshottypes.SnapshotItem_IAVL:
if storeKey == "" {
return errors.Wrap(err, "invalid protobuf message, store name is empty")
}
if item.IAVL.Height > math.MaxInt8 {
return fmt.Errorf("node height %v cannot exceed %v",
item.IAVL.Height, math.MaxInt8)
}
ch <- versiondb.ImportEntry{
StoreKey: storeKey,
Key: item.IAVL.Key,
Value: item.IAVL.Value,
}
default:
break loop
}
}

return nil
}
33 changes: 33 additions & 0 deletions versiondb/tsrocksdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (

StorePrefixTpl = "s/k:%s/"
latestVersionKey = "s/latest"

ImportCommitBatchSize = 10000
)

var (
Expand Down Expand Up @@ -167,6 +169,37 @@ func (s Store) FeedChangeSet(version int64, store string, changeSet *iavl.Change
return s.db.Write(defaultWriteOpts, batch)
}

// Import loads the initial version of the state
func (s Store) Import(version int64, ch <-chan versiondb.ImportEntry) error {
batch := grocksdb.NewWriteBatch()
defer batch.Destroy()

var ts [TimestampSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion

var counter int
for entry := range ch {
key := cloneAppend(storePrefix(entry.StoreKey), entry.Key)
batch.PutCFWithTS(s.cfHandle, key, ts[:], entry.Value)

counter++
if counter%ImportCommitBatchSize == 0 {
if err := s.db.Write(defaultWriteOpts, batch); err != nil {
return err
}
batch.Clear()
}
}

if batch.Count() > 0 {
if err := s.db.Write(defaultWriteOpts, batch); err != nil {
return err
}
}

return s.SetLatestVersion(version)
}

func newTSReadOptions(version *int64) *grocksdb.ReadOptions {
var ver uint64
if version == nil {
Expand Down
9 changes: 9 additions & 0 deletions versiondb/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,13 @@ type VersionStore interface {
// the `changeSet` should be ordered by (storeKey, key),
// the version should be latest version plus one.
PutAtVersion(version int64, changeSet []types.StoreKVPair) error

// Import the initial state of the store
Import(version int64, ch <-chan ImportEntry) error
}

type ImportEntry struct {
StoreKey string
Key []byte
Value []byte
}