Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: no command to patch the duplicated tx situation #522

Merged
merged 6 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

- [cronos#489](/~https://github.com/crypto-org-chain/cronos/pull/489) Enable jemalloc memory allocator, and update rocksdb src to `v6.29.5`.
- [#513](/~https://github.com/crypto-org-chain/cronos/pull/513) Add `fix-unlucky-tx` command to patch txs post v0.7.0 upgrade.
- [cronos#522](/~https://github.com/crypto-org-chain/cronos/pull/522) Add `reindex-duplicated-tx` command to handle the tendermint tx duplicated issue.

*May 3, 2022*

Expand Down
186 changes: 186 additions & 0 deletions cmd/cronosd/cmd/reindex-duplicated-tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package cmd

import (
"bufio"
"context"
"fmt"
"os"
"runtime"
"strconv"
"sync"

"github.com/spf13/cobra"

"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/server"
abci "github.com/tendermint/tendermint/abci/types"
)

const (
FlagPrintTxs = "print-txs"
FlagBlocksFile = "blocks-file"
FlagStartBlock = "start-block"
FlagEndBlock = "end-block"
FlagConcurrency = "concurrency"
)

// ReindexDuplicatedTx update the tx execution result of false-failed tx in tendermint db
func ReindexDuplicatedTx() *cobra.Command {
cmd := &cobra.Command{
Use: "reindex-duplicated-tx",
Short: "Reindex tx that suffer from tendermint duplicated tx issue, it can handle both v0.6.x and v0.7.x blocks.",
Args: cobra.ExactArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := server.GetServerContextFromCmd(cmd)

chainID, err := cmd.Flags().GetString(flags.FlagChainID)
if err != nil {
return err
}
printTxs, err := cmd.Flags().GetBool(FlagPrintTxs)
if err != nil {
return err
}
yihuang marked this conversation as resolved.
Show resolved Hide resolved

tmDB, err := openTMDB(ctx.Config, chainID)
if err != nil {
return err
}

// iterate and patch a single block
processBlock := func(height int64) error {
blockResult, err := tmDB.stateStore.LoadABCIResponses(height)
if err != nil {
return err
}
block := tmDB.blockStore.LoadBlock(height)
if block == nil {
return fmt.Errorf("block not found: %d", height)
}

for txIndex, txResult := range blockResult.DeliverTxs {
tx := block.Txs[txIndex]
txHash := tx.Hash()
indexed, err := tmDB.txIndexer.Get(txHash)
if err != nil {
return err
}
if txResult.Code == 0 && txResult.Code != indexed.Result.Code {
if printTxs {
fmt.Println(height, txIndex)
continue
}
// a success tx but indexed wrong, reindex the tx
result := &abci.TxResult{
Height: height,
Index: uint32(txIndex),
Tx: tx,
Result: *txResult,
}

if err := tmDB.txIndexer.Index(result); err != nil {
return err
}
}
}

return nil
}

concurrency, err := cmd.Flags().GetInt(FlagConcurrency)
if err != nil {
return err
}

blockChan := make(chan int64, concurrency)
var wg sync.WaitGroup
ctCtx, cancel := context.WithCancel(context.Background())
defer cancel()

for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctCtx.Done():
return
case blockNum, ok := <-blockChan:
if !ok {
return
}

if err := processBlock(blockNum); err != nil {
fmt.Fprintf(os.Stderr, "process block failed: %d, %+v", blockNum, err)
cancel()
yihuang marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}
}(&wg)
}

blocksFile, err := cmd.Flags().GetString(FlagBlocksFile)
if err != nil {
return err
}
findBlock := func() error {
if len(blocksFile) > 0 {
// read block numbers from file, one number per line
file, err := os.Open(blocksFile)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
blockNumber, err := strconv.ParseInt(scanner.Text(), 10, 64)
if err != nil {
return err
}
blockChan <- blockNumber
}
} else {
startHeight, err := cmd.Flags().GetInt(FlagStartBlock)
if err != nil {
return err
}
endHeight, err := cmd.Flags().GetInt(FlagEndBlock)
if err != nil {
return err
}
if startHeight < 1 {
return fmt.Errorf("invalid start-block: %d", startHeight)
}
if endHeight < startHeight {
return fmt.Errorf("invalid end-block %d, smaller than start-block", endHeight)
}

for height := startHeight; height <= endHeight; height++ {
blockChan <- int64(height)
}
}
return nil
}

go func() {
if err := findBlock(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
close(blockChan)
}()

wg.Wait()

return ctCtx.Err()
},
}
cmd.Flags().String(flags.FlagChainID, "cronosmainnet_25-1", "network chain ID, only useful for psql tx indexer backend")
cmd.Flags().Bool(FlagPrintTxs, false, "Print the block number and tx indexes of the duplicated txs without patch")
cmd.Flags().String(FlagBlocksFile, "", "Read block numbers from a file instead of iterating all the blocks")
cmd.Flags().Int(FlagStartBlock, 1, "The start of the block range to iterate, inclusive")
cmd.Flags().Int(FlagEndBlock, -1, "The end of the block range to iterate, inclusive")
cmd.Flags().Int(FlagConcurrency, runtime.NumCPU(), "Define how many workers run in concurrency")

return cmd
}
1 change: 1 addition & 0 deletions cmd/cronosd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func initRootCmd(rootCmd *cobra.Command, encodingConfig params.EncodingConfig) {
txCommand(),
ethermintclient.KeyCommands(app.DefaultNodeHome),
FixUnluckyTxCmd(),
ReindexDuplicatedTx(),
)

// add rosetta
Expand Down