Skip to content

Commit

Permalink
Remove duplicates from offchain data (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
begmaroman authored Aug 19, 2024
1 parent bd6eed4 commit b48599a
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 40 deletions.
3 changes: 3 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) {
func buildOffchainDataInsertQuery(ods []types.OffChainData) (string, []interface{}) {
const columnsAffected = 3

// Remove duplicates from the given offchain data
ods = types.RemoveDuplicateOffChainData(ods)

args := make([]interface{}, len(ods)*columnsAffected)
values := make([]string, len(ods))
for i, od := range ods {
Expand Down
58 changes: 29 additions & 29 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,26 +175,26 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) {
name: "one value inserted",
bk: []types.BatchKey{{
Number: 1,
Hash: common.HexToHash("key1"),
Hash: common.BytesToHash([]byte("key1")),
}},
expectedQuery: `INSERT INTO data_node.unresolved_batches (num, hash) VALUES ($1, $2) ON CONFLICT (num, hash) DO NOTHING`,
},
{
name: "several values inserted",
bk: []types.BatchKey{{
Number: 1,
Hash: common.HexToHash("key1"),
Hash: common.BytesToHash([]byte("key1")),
}, {
Number: 2,
Hash: common.HexToHash("key2"),
Hash: common.BytesToHash([]byte("key2")),
}},
expectedQuery: `INSERT INTO data_node.unresolved_batches (num, hash) VALUES ($1, $2),($3, $4) ON CONFLICT (num, hash) DO NOTHING`,
},
{
name: "error returned",
bk: []types.BatchKey{{
Number: 1,
Hash: common.HexToHash("key1"),
Hash: common.BytesToHash([]byte("key1")),
}},
expectedQuery: `INSERT INTO data_node.unresolved_batches (num, hash) VALUES ($1, $2) ON CONFLICT (num, hash) DO NOTHING`,
returnErr: errors.New("test error"),
Expand Down Expand Up @@ -262,14 +262,14 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) {
name: "successfully selected data",
bks: []types.BatchKey{{
Number: 1,
Hash: common.HexToHash("key1"),
Hash: common.BytesToHash([]byte("key1")),
}},
},
{
name: "error returned",
bks: []types.BatchKey{{
Number: 1,
Hash: common.HexToHash("key1"),
Hash: common.BytesToHash([]byte("key1")),
}},
returnErr: errors.New("test error"),
},
Expand Down Expand Up @@ -332,26 +332,26 @@ func Test_DB_DeleteUnresolvedBatchKeys(t *testing.T) {
name: "value deleted",
bks: []types.BatchKey{{
Number: 1,
Hash: common.HexToHash("key1"),
Hash: common.BytesToHash([]byte("key1")),
}},
expectedQuery: `DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (($1, $2))`,
},
{
name: "multiple values deleted",
bks: []types.BatchKey{{
Number: 1,
Hash: common.HexToHash("key1"),
Hash: common.BytesToHash([]byte("key1")),
}, {
Number: 2,
Hash: common.HexToHash("key2"),
Hash: common.BytesToHash([]byte("key2")),
}},
expectedQuery: `DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (($1, $2),($3, $4))`,
},
{
name: "error returned",
bks: []types.BatchKey{{
Number: 1,
Hash: common.HexToHash("key1"),
Hash: common.BytesToHash([]byte("key1")),
}},
expectedQuery: `DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (($1, $2))`,
returnErr: errors.New("test error"),
Expand Down Expand Up @@ -416,26 +416,26 @@ func Test_DB_StoreOffChainData(t *testing.T) {
{
name: "one value inserted",
ods: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
}},
expectedQuery: `INSERT INTO data_node.offchain_data (key, value, batch_num) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num`,
},
{
name: "several values inserted",
ods: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
}, {
Key: common.HexToHash("key2"),
Key: common.BytesToHash([]byte("key2")),
Value: []byte("value2"),
}},
expectedQuery: `INSERT INTO data_node.offchain_data (key, value, batch_num) VALUES ($1, $2, $3),($4, $5, $6) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num`,
},
{
name: "error returned",
ods: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
}},
expectedQuery: `INSERT INTO data_node.offchain_data (key, value, batch_num) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num`,
Expand Down Expand Up @@ -499,21 +499,21 @@ func Test_DB_GetOffChainData(t *testing.T) {
{
name: "successfully selected value",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
BatchNum: 1,
}},
key: common.BytesToHash([]byte("key1")),
expected: &types.OffChainData{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
BatchNum: 1,
},
},
{
name: "error returned",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
}},
key: common.BytesToHash([]byte("key1")),
Expand All @@ -522,7 +522,7 @@ func Test_DB_GetOffChainData(t *testing.T) {
{
name: "no rows",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
}},
key: common.BytesToHash([]byte("undefined")),
Expand Down Expand Up @@ -587,7 +587,7 @@ func Test_DB_ListOffChainData(t *testing.T) {
{
name: "successfully selected one value",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
}},
keys: []common.Hash{
Expand All @@ -605,11 +605,11 @@ func Test_DB_ListOffChainData(t *testing.T) {
{
name: "successfully selected two values",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
BatchNum: 1,
}, {
Key: common.HexToHash("key2"),
Key: common.BytesToHash([]byte("key2")),
Value: []byte("value2"),
BatchNum: 2,
}},
Expand All @@ -634,7 +634,7 @@ func Test_DB_ListOffChainData(t *testing.T) {
{
name: "error returned",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
}},
keys: []common.Hash{
Expand All @@ -646,7 +646,7 @@ func Test_DB_ListOffChainData(t *testing.T) {
{
name: "no rows",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
}},
keys: []common.Hash{
Expand Down Expand Up @@ -722,10 +722,10 @@ func Test_DB_CountOffchainData(t *testing.T) {
{
name: "two values found",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
}, {
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value2"),
}},
count: 2,
Expand All @@ -737,7 +737,7 @@ func Test_DB_CountOffchainData(t *testing.T) {
{
name: "error returned",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
}},
returnErr: errors.New("test error"),
Expand Down Expand Up @@ -797,11 +797,11 @@ func Test_DB_DetectOffchainDataGaps(t *testing.T) {
{
name: "one gap found",
seed: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
BatchNum: 1,
}, {
Key: common.HexToHash("key2"),
Key: common.BytesToHash([]byte("key2")),
Value: []byte("value2"),
BatchNum: 2,
}, {
Expand All @@ -816,7 +816,7 @@ func Test_DB_DetectOffchainDataGaps(t *testing.T) {
{
name: "error returned",
seed: []types.OffChainData{{
Key: common.HexToHash("key1"),
Key: common.BytesToHash([]byte("key1")),
Value: []byte("value1"),
}},
returnErr: errors.New("test error"),
Expand Down
4 changes: 2 additions & 2 deletions pkg/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package backoff
import "time"

// Exponential performs exponential backoff attempts on a given action
func Exponential(action func() error, max uint, wait time.Duration) error {
func Exponential(action func() error, attempts uint, wait time.Duration) error {
var err error
for i := uint(0); i < max; i++ {
for i := uint(0); i < attempts; i++ {
if err = action(); err == nil {
return nil
}
Expand Down
38 changes: 29 additions & 9 deletions services/datacom/datacom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,10 @@ func TestDataCom_SignSequence(t *testing.T) {
storeOffChainDataReturns []interface{}
sender *ecdsa.PrivateKey
signer *ecdsa.PrivateKey
sequence types.Sequence
expectedError string
}

sequence := types.Sequence{
types.ArgBytes([]byte{0, 1}),
types.ArgBytes([]byte{2, 3}),
}

privateKey, err := crypto.GenerateKey()
require.NoError(t, err)

Expand All @@ -51,7 +47,7 @@ func TestDataCom_SignSequence(t *testing.T) {
dbMock := mocks.NewDB(t)

if len(cfg.storeOffChainDataReturns) > 0 {
dbMock.On("StoreOffChainData", mock.Anything, sequence.OffChainData()).Return(
dbMock.On("StoreOffChainData", mock.Anything, cfg.sequence.OffChainData()).Return(
cfg.storeOffChainDataReturns...).Once()
}

Expand All @@ -68,15 +64,15 @@ func TestDataCom_SignSequence(t *testing.T) {
sqr.Start(context.Background())

if cfg.sender != nil {
signature, err := sequence.Sign(cfg.sender)
signature, err := cfg.sequence.Sign(cfg.sender)
require.NoError(t, err)
signedSequence = &types.SignedSequence{
Sequence: sequence,
Sequence: cfg.sequence,
Signature: signature,
}
} else {
signedSequence = &types.SignedSequence{
Sequence: sequence,
Sequence: cfg.sequence,
Signature: []byte{},
}
}
Expand Down Expand Up @@ -106,6 +102,10 @@ func TestDataCom_SignSequence(t *testing.T) {

testFn(t, testConfig{
expectedError: "failed to verify sender",
sequence: types.Sequence{
types.ArgBytes{0, 1},
types.ArgBytes{2, 3},
},
})
})

Expand All @@ -115,6 +115,10 @@ func TestDataCom_SignSequence(t *testing.T) {
testFn(t, testConfig{
sender: privateKey,
expectedError: "unauthorized",
sequence: types.Sequence{
types.ArgBytes{0, 1},
types.ArgBytes{2, 3},
},
})
})

Expand All @@ -124,6 +128,10 @@ func TestDataCom_SignSequence(t *testing.T) {
testFn(t, testConfig{
sender: privateKey,
expectedError: "unauthorized",
sequence: types.Sequence{
types.ArgBytes{0, 1},
types.ArgBytes{2, 3},
},
})
})

Expand All @@ -134,6 +142,10 @@ func TestDataCom_SignSequence(t *testing.T) {
sender: otherPrivateKey,
expectedError: "failed to store offchain data",
storeOffChainDataReturns: []interface{}{errors.New("error")},
sequence: types.Sequence{
types.ArgBytes{0, 1},
types.ArgBytes{2, 3},
},
})
})

Expand All @@ -150,6 +162,10 @@ func TestDataCom_SignSequence(t *testing.T) {
signer: key,
storeOffChainDataReturns: []interface{}{nil},
expectedError: "failed to sign",
sequence: types.Sequence{
types.ArgBytes{0, 1},
types.ArgBytes{2, 3},
},
})
})

Expand All @@ -159,6 +175,10 @@ func TestDataCom_SignSequence(t *testing.T) {
testFn(t, testConfig{
sender: otherPrivateKey,
storeOffChainDataReturns: []interface{}{nil},
sequence: types.Sequence{
types.ArgBytes{0, 1},
types.ArgBytes{2, 3},
},
})
})
}
13 changes: 13 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ type OffChainData struct {
BatchNum uint64
}

// RemoveDuplicateOffChainData removes duplicate off chain data
func RemoveDuplicateOffChainData(ods []OffChainData) []OffChainData {
seen := make(map[common.Hash]struct{})
result := []OffChainData{}
for _, od := range ods {
if _, ok := seen[od.Key]; !ok {
seen[od.Key] = struct{}{}
result = append(result, od)
}
}
return result
}

// ArgUint64 helps to marshal uint64 values provided in the RPC requests
type ArgUint64 uint64

Expand Down
Loading

0 comments on commit b48599a

Please sign in to comment.