Skip to content

Commit

Permalink
limit the amount of unprocessed batch keys at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
christophercampbell committed Jun 19, 2024
1 parent 4c27e9a commit 3133609
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 29 deletions.
8 changes: 4 additions & 4 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type DB interface {
GetLastProcessedBlock(ctx context.Context, task string) (uint64, error)

StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error
GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error)
GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error)
DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error

Exists(ctx context.Context, key common.Hash) bool
Expand Down Expand Up @@ -113,10 +113,10 @@ func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchK
}

// GetUnresolvedBatchKeys returns the unresolved batch keys from the database
func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error) {
const getUnresolvedBatchKeysSQL = "SELECT num, hash FROM data_node.unresolved_batches;"
func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) {
const getUnresolvedBatchKeysSQL = "SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;"

rows, err := db.pg.QueryxContext(ctx, getUnresolvedBatchKeysSQL)
rows, err := db.pg.QueryxContext(ctx, getUnresolvedBatchKeysSQL, limit)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) {
// Seed data
seedUnresolvedBatchKeys(t, wdb, mock, tt.bks)

expected := mock.ExpectQuery(`SELECT num, hash FROM data_node\.unresolved_batches`)
var limit = uint(10)
expected := mock.ExpectQuery(`SELECT num, hash FROM data_node\.unresolved_batches LIMIT \$1\;`).WithArgs(limit)

if tt.returnErr != nil {
expected.WillReturnError(tt.returnErr)
Expand All @@ -256,7 +257,7 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) {

dbPG := New(wdb)

data, err := dbPG.GetUnresolvedBatchKeys(context.Background())
data, err := dbPG.GetUnresolvedBatchKeys(context.Background(), limit)
if tt.returnErr != nil {
require.ErrorIs(t, err, tt.returnErr)
} else {
Expand Down
29 changes: 15 additions & 14 deletions mocks/db.generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions synchronizer/batches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) {
t.Parallel()

testFn(t, testConfig{
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything},
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)},
getUnresolvedBatchKeysReturns: []interface{}{nil, errors.New("error")},
isErrorExpected: true,
})
Expand All @@ -652,7 +652,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) {
t.Parallel()

testFn(t, testConfig{
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything},
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)},
getUnresolvedBatchKeysReturns: []interface{}{nil, nil},
isErrorExpected: false,
})
Expand All @@ -662,7 +662,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) {
t.Parallel()

testFn(t, testConfig{
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything},
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)},
getUnresolvedBatchKeysReturns: []interface{}{
[]types.BatchKey{{
Number: 10,
Expand Down Expand Up @@ -690,7 +690,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) {
t.Parallel()

testFn(t, testConfig{
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything},
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)},
getUnresolvedBatchKeysReturns: []interface{}{
[]types.BatchKey{{
Number: 10,
Expand Down
5 changes: 3 additions & 2 deletions synchronizer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
)

const (
initBlockTimeout = 15 * time.Second
minCodeLen = 2
initBlockTimeout = 15 * time.Second
minCodeLen = 2
maxUnprocessedBatch = 100
)

// InitStartBlock initializes the L1 sync task by finding the inception block for the CDKValidium contract
Expand Down
2 changes: 1 addition & 1 deletion synchronizer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func getUnresolvedBatchKeys(db dbTypes.DB) ([]types.BatchKey, error) {
ctx, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()

return db.GetUnresolvedBatchKeys(ctx)
return db.GetUnresolvedBatchKeys(ctx, maxUnprocessedBatch)
}

func deleteUnresolvedBatchKeys(db dbTypes.DB, keys []types.BatchKey) error {
Expand Down
4 changes: 2 additions & 2 deletions synchronizer/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) {
db: func(t *testing.T) db.DB {
mockDB := mocks.NewDB(t)

mockDB.On("GetUnresolvedBatchKeys", mock.Anything).
mockDB.On("GetUnresolvedBatchKeys", mock.Anything, uint(100)).
Return(nil, testError)

return mockDB
Expand All @@ -322,7 +322,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) {
db: func(t *testing.T) db.DB {
mockDB := mocks.NewDB(t)

mockDB.On("GetUnresolvedBatchKeys", mock.Anything).Return(testData, nil)
mockDB.On("GetUnresolvedBatchKeys", mock.Anything, uint(100)).Return(testData, nil)

return mockDB
},
Expand Down

0 comments on commit 3133609

Please sign in to comment.