Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar committed Aug 28, 2024
1 parent 51f0bb4 commit 6cd5cd7
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 79 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ coverage.out
db/db-packr.go
db/packrd/
packrd/
nohup.out
1 change: 1 addition & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ func forkIDIntervals(ctx context.Context, st *state.State, etherman *etherman.Cl
if err != nil && !errors.Is(err, state.ErrStateNotSynchronized) {
return []state.ForkIDInterval{}, fmt.Errorf("error checking lastL1BlockSynced. Error: %v", err)
}
// If lastBlock is below genesisBlock means state.ErrStateNotSynchronized (haven't started yet the sync process, is doing pregenesis sync)
if lastBlock != nil && lastBlock.BlockNumber >= genesisBlockNumber {
log.Info("Getting forkIDs intervals. Please wait...")
// Read Fork ID FROM POE SC
Expand Down
5 changes: 5 additions & 0 deletions state/block.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package state

import (
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -19,3 +20,7 @@ type Block struct {
func NewBlock(blockNumber uint64) *Block {
return &Block{BlockNumber: blockNumber}
}

func (b *Block) String() string {
return fmt.Sprintf("BlockNumber: %d, BlockHash: %s, ParentHash: %s, ReceivedAt: %s", b.BlockNumber, b.BlockHash, b.ParentHash, b.ReceivedAt)
}
145 changes: 66 additions & 79 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,35 +635,28 @@ func (s *ClientSynchronizer) RequestAndProcessRollupGenesisBlock(dbTx pgx.Tx, la
if err != nil {
return err
}
forkId := s.state.GetForkIDByBlockNumber(blocks[0].BlockNumber)
err = s.l1EventProcessors.Process(s.ctx, actions.ForkIdType(forkId), etherman.Order{Name: etherman.ForkIDsOrder, Pos: 0}, &blocks[0], dbTx)
err = s.internalProcessBlock(blocks[0], order[blocks[0].BlockHash], false, dbTx)
if err != nil {
log.Error("error storing genesis forkID: ", err)
return err
}
if len(blocks[0].SequencedBatches) != 0 {
batchSequence := l1event_orders.GetSequenceFromL1EventOrder(etherman.InitialSequenceBatchesOrder, &blocks[0], 0)
forkId = s.state.GetForkIDByBatchNumber(batchSequence.FromBatchNumber)
err = s.l1EventProcessors.Process(s.ctx, actions.ForkIdType(forkId), etherman.Order{Name: etherman.InitialSequenceBatchesOrder, Pos: 0}, &blocks[0], dbTx)
if err != nil {
log.Error("error storing initial tx (batch 1): ", err)
return err
}
log.Errorf("error processinge events on genesis block %d: err:%w", lastEthBlockSynced.BlockNumber, err)
}
return nil
}

func sanityCheckForGenesisBlockRollupInfo(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error {
if len(blocks) != 1 || len(order) < 1 || len(order[blocks[0].BlockHash]) < 1 {
log.Errorf("error getting rollupInfoByBlockRange after set the genesis. Expected 1 block with 2 orders")
return fmt.Errorf("error getting rollupInfoByBlockRange after set the genesis. Expected 1 block with 2 orders")
log.Errorf("error getting rollupInfoByBlockRange after set the genesis. Expected 1 block with minimum 2 orders")
return fmt.Errorf("error getting rollupInfoByBlockRange after set the genesis. Expected 1 block with minimum 2 orders")
}
if order[blocks[0].BlockHash][0].Name != etherman.ForkIDsOrder {
log.Errorf("error getting rollupInfoByBlockRange after set the genesis. Expected ForkIDsOrder, got %s", order[blocks[0].BlockHash][0].Name)
return fmt.Errorf("error getting rollupInfoByBlockRange after set the genesis. Expected ForkIDsOrder")
// The genesis block implies 1 ForkID event
for _, value := range order[blocks[0].BlockHash] {
if value.Name == etherman.ForkIDsOrder {
return nil
}
}

return nil
err := fmt.Errorf("events on genesis block (%d) need a ForkIDsOrder event but this block got %+v",
blocks[0].BlockNumber, order[blocks[0].BlockHash])
log.Error(err.Error())
return err
}

// This function syncs the node from a specific block to the latest
Expand Down Expand Up @@ -796,81 +789,27 @@ func removeBlockElement(slice []etherman.Block, s int) []etherman.Block {

// ProcessBlockRange process the L1 events and stores the information in the db
func (s *ClientSynchronizer) ProcessBlockRange(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error {
// Check the latest finalized block in L1
finalizedBlockNumber, err := s.etherMan.GetFinalizedBlockNumber(s.ctx)
if err != nil {
log.Errorf("error getting finalized block number in L1. Error: %v", err)
return err
}
// New info has to be included into the db using the state
for i := range blocks {
// Begin db transaction
dbTx, err := s.state.BeginStateTransaction(s.ctx)
if err != nil {
log.Errorf("error creating db transaction to store block. BlockNumber: %d, error: %v", blocks[i].BlockNumber, err)
return err
}
b := state.Block{
BlockNumber: blocks[i].BlockNumber,
BlockHash: blocks[i].BlockHash,
ParentHash: blocks[i].ParentHash,
ReceivedAt: blocks[i].ReceivedAt,
}
if blocks[i].BlockNumber <= finalizedBlockNumber {
b.Checked = true
}
// Add block information
err = s.state.AddBlock(s.ctx, &b, dbTx)
err = s.internalProcessBlock(blocks[i], order[blocks[i].BlockHash], true, dbTx)
if err != nil {
log.Error("rollingback BlockNumber: %d, because error internalProcessBlock: ", blocks[i].BlockNumber, err)
// If any goes wrong we ensure that the state is rollbacked
log.Errorf("error storing block. BlockNumber: %d, error: %v", blocks[i].BlockNumber, err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
log.Errorf("error rolling back state to store block. BlockNumber: %d, rollbackErr: %s, error : %v", blocks[i].BlockNumber, rollbackErr.Error(), err)
return rollbackErr
if rollbackErr != nil && !errors.Is(rollbackErr, pgx.ErrTxClosed) {
log.Warnf("error rolling back state to store block. BlockNumber: %d, rollbackErr: %s, error : %v", blocks[i].BlockNumber, rollbackErr.Error(), err)
return fmt.Errorf("error rollback BlockNumber: %d: err:%w original error:%w", blocks[i].BlockNumber, rollbackErr, err)
}
return err
}

for _, element := range order[blocks[i].BlockHash] {
batchSequence := l1event_orders.GetSequenceFromL1EventOrder(element.Name, &blocks[i], element.Pos)
var forkId uint64
if batchSequence != nil {
forkId = s.state.GetForkIDByBatchNumber(batchSequence.FromBatchNumber)
log.Debug("EventOrder: ", element.Name, ". Batch Sequence: ", batchSequence, "forkId: ", forkId)
} else {
forkId = s.state.GetForkIDByBlockNumber(blocks[i].BlockNumber)
log.Debug("EventOrder: ", element.Name, ". BlockNumber: ", blocks[i].BlockNumber, ". forkId: ", forkId)
}
forkIdTyped := actions.ForkIdType(forkId)
// Process event received from l1
err := s.l1EventProcessors.Process(s.ctx, forkIdTyped, element, &blocks[i], dbTx)
if err != nil {
log.Error("error: ", err)
// If any goes wrong we ensure that the state is rollbacked
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil && !errors.Is(rollbackErr, pgx.ErrTxClosed) {
log.Warnf("error rolling back state to store block. BlockNumber: %d, rollbackErr: %s, error : %v", blocks[i].BlockNumber, rollbackErr.Error(), err)
return rollbackErr
}
return err
}
}
log.Debug("Checking FlushID to commit L1 data to db")
err = s.checkFlushID(dbTx)
if err != nil {
// If any goes wrong we ensure that the state is rollbacked
log.Errorf("error checking flushID. Error: %v", err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
log.Errorf("error rolling back state. RollbackErr: %s, Error : %v", rollbackErr.Error(), err)
return rollbackErr
}
return err
}
err = dbTx.Commit(s.ctx)
if err != nil {
// If any goes wrong we ensure that the state is rollbacked
log.Errorf("error committing state to store block. BlockNumber: %d, err: %v", blocks[i].BlockNumber, err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
Expand All @@ -883,6 +822,54 @@ func (s *ClientSynchronizer) ProcessBlockRange(blocks []etherman.Block, order ma
return nil
}

// ProcessBlockRange process the L1 events and stores the information in the db
func (s *ClientSynchronizer) internalProcessBlock(blocks etherman.Block, order []etherman.Order, addBlock bool, dbTx pgx.Tx) error {
var err error
// New info has to be included into the db using the state
if addBlock {
b := state.Block{
BlockNumber: blocks.BlockNumber,
BlockHash: blocks.BlockHash,
ParentHash: blocks.ParentHash,
ReceivedAt: blocks.ReceivedAt,
}
// Add block information
log.Debugf("Storing block. Block: %s", b.String())
err = s.state.AddBlock(s.ctx, &b, dbTx)
if err != nil {
log.Errorf("error storing block. BlockNumber: %d, error: %v", blocks.BlockNumber, err)
return err
}
}

for _, element := range order {
batchSequence := l1event_orders.GetSequenceFromL1EventOrder(element.Name, &blocks, element.Pos)
var forkId uint64
if batchSequence != nil {
forkId = s.state.GetForkIDByBatchNumber(batchSequence.FromBatchNumber)
log.Debug("EventOrder: ", element.Name, ". Batch Sequence: ", batchSequence, "forkId: ", forkId)
} else {
forkId = s.state.GetForkIDByBlockNumber(blocks.BlockNumber)
log.Debug("EventOrder: ", element.Name, ". BlockNumber: ", blocks.BlockNumber, "forkId: ", forkId)
}
forkIdTyped := actions.ForkIdType(forkId)
// Process event received from l1
err := s.l1EventProcessors.Process(s.ctx, forkIdTyped, element, &blocks, dbTx)
if err != nil {
log.Error("1EventProcessors.Process error: ", err)
return err
}
}
log.Debug("Checking FlushID to commit L1 data to db")
err = s.checkFlushID(dbTx)
if err != nil {
log.Errorf("error checking flushID. Error: %v", err)
return err
}

return nil
}

func (s *ClientSynchronizer) syncTrustedState(latestSyncedBatch uint64) error {
if s.syncTrustedStateExecutor == nil {
return nil
Expand Down

0 comments on commit 6cd5cd7

Please sign in to comment.