Skip to content

Commit

Permalink
Merge pull request #3611 from filecoin-project/fix/tscache-nil-best
Browse files Browse the repository at this point in the history
chain events: if cache best() is nil, return chain head
  • Loading branch information
magik6k authored Sep 7, 2020
2 parents 3fe8cde + 76a1b32 commit d3e5092
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 27 deletions.
3 changes: 2 additions & 1 deletion chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type eventAPI interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainHead(context.Context) (*types.TipSet, error)
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)

Expand All @@ -57,7 +58,7 @@ type Events struct {
func NewEvents(ctx context.Context, api eventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold

tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight)
tsc := newTSCache(gcConfidence, api)

e := &Events{
api: api,
Expand Down
5 changes: 4 additions & 1 deletion chain/events/events_called.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,10 @@ func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHa
defer e.lk.Unlock()

// Check if the event has already occurred
ts := e.tsc.best()
ts, err := e.tsc.best()
if err != nil {
return 0, xerrors.Errorf("error getting best tipset: %w", err)
}
done, more, err := check(ts)
if err != nil {
return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err)
Expand Down
14 changes: 12 additions & 2 deletions chain/events/events_height.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"

"golang.org/x/xerrors"

"github.com/filecoin-project/specs-actors/actors/abi"
"go.opencensus.io/trace"

Expand Down Expand Up @@ -152,8 +154,12 @@ func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence

e.lk.Lock() // Tricky locking, check your locks if you modify this function!

bestH := e.tsc.best().Height()
best, err := e.tsc.best()
if err != nil {
return xerrors.Errorf("error getting best tipset: %w", err)
}

bestH := best.Height()
if bestH >= h+abi.ChainEpoch(confidence) {
ts, err := e.tsc.getNonNull(h)
if err != nil {
Expand All @@ -172,7 +178,11 @@ func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence
}

e.lk.Lock()
bestH = e.tsc.best().Height()
best, err = e.tsc.best()
if err != nil {
return xerrors.Errorf("error getting best tipset: %w", err)
}
bestH = best.Height()
}

defer e.lk.Unlock()
Expand Down
17 changes: 14 additions & 3 deletions chain/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type fakeCS struct {
sub func(rev, app []*types.TipSet)
}

func (fcs *fakeCS) ChainHead(ctx context.Context) (*types.TipSet, error) {
panic("implement me")
}

func (fcs *fakeCS) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) {
return fcs.tipsets[key], nil
}
Expand Down Expand Up @@ -110,7 +114,11 @@ func (fcs *fakeCS) makeTs(t *testing.T, parents []cid.Cid, h abi.ChainEpoch, msg

func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error) {
out := make(chan []*api.HeadChange, 1)
out <- []*api.HeadChange{{Type: store.HCCurrent, Val: fcs.tsc.best()}}
best, err := fcs.tsc.best()
if err != nil {
return nil, err
}
out <- []*api.HeadChange{{Type: store.HCCurrent, Val: best}}

fcs.sub = func(rev, app []*types.TipSet) {
notif := make([]*api.HeadChange, len(rev)+len(app))
Expand Down Expand Up @@ -174,7 +182,8 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { /

var revs []*types.TipSet
for i := 0; i < rev; i++ {
ts := fcs.tsc.best()
ts, err := fcs.tsc.best()
require.NoError(fcs.t, err)

if _, ok := nullm[int(ts.Height())]; !ok {
revs = append(revs, ts)
Expand All @@ -196,7 +205,9 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { /
continue
}

ts := fcs.makeTs(fcs.t, fcs.tsc.best().Key().Cids(), fcs.h, mc)
best, err := fcs.tsc.best()
require.NoError(fcs.t, err)
ts := fcs.makeTs(fcs.t, best.Key().Cids(), fcs.h, mc)
require.NoError(fcs.t, fcs.tsc.add(ts))

if hasMsgs {
Expand Down
21 changes: 14 additions & 7 deletions chain/events/tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)

type tsByHFunc func(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
type tsCacheAPI interface {
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainHead(context.Context) (*types.TipSet, error)
}

// tipSetCache implements a simple ring-buffer cache to keep track of recent
// tipsets
Expand All @@ -18,10 +21,10 @@ type tipSetCache struct {
start int
len int

storage tsByHFunc
storage tsCacheAPI
}

func newTSCache(cap abi.ChainEpoch, storage tsByHFunc) *tipSetCache {
func newTSCache(cap abi.ChainEpoch, storage tsCacheAPI) *tipSetCache {
return &tipSetCache{
cache: make([]*types.TipSet, cap),
start: 0,
Expand Down Expand Up @@ -94,7 +97,7 @@ func (tsc *tipSetCache) getNonNull(height abi.ChainEpoch) (*types.TipSet, error)
func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
if tsc.len == 0 {
log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
return tsc.storage(context.TODO(), height, types.EmptyTSK)
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, types.EmptyTSK)
}

headH := tsc.cache[tsc.start].Height()
Expand All @@ -114,14 +117,18 @@ func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {

if height < tail.Height() {
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tail.Height())
return tsc.storage(context.TODO(), height, tail.Key())
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, tail.Key())
}

return tsc.cache[normalModulo(tsc.start-int(headH-height), clen)], nil
}

func (tsc *tipSetCache) best() *types.TipSet {
return tsc.cache[tsc.start]
func (tsc *tipSetCache) best() (*types.TipSet, error) {
best := tsc.cache[tsc.start]
if best == nil {
return tsc.storage.ChainHead(context.TODO())
}
return best, nil
}

func normalModulo(n, m int) int {
Expand Down
72 changes: 59 additions & 13 deletions chain/events/tscache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ import (
)

func TestTsCache(t *testing.T) {
tsc := newTSCache(50, func(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) {
t.Fatal("storage call")
return &types.TipSet{}, nil
})
tsc := newTSCache(50, &tsCacheAPIFailOnStorageCall{t: t})

h := abi.ChainEpoch(75)

Expand All @@ -43,7 +40,12 @@ func TestTsCache(t *testing.T) {

for i := 0; i < 9000; i++ {
if i%90 > 60 {
if err := tsc.revert(tsc.best()); err != nil {
best, err := tsc.best()
if err != nil {
t.Fatal(err, "; i:", i)
return
}
if err := tsc.revert(best); err != nil {
t.Fatal(err, "; i:", i)
return
}
Expand All @@ -55,11 +57,21 @@ func TestTsCache(t *testing.T) {

}

type tsCacheAPIFailOnStorageCall struct {
t *testing.T
}

func (tc *tsCacheAPIFailOnStorageCall) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
tc.t.Fatal("storage call")
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIFailOnStorageCall) ChainHead(ctx context.Context) (*types.TipSet, error) {
tc.t.Fatal("storage call")
return &types.TipSet{}, nil
}

func TestTsCacheNulls(t *testing.T) {
tsc := newTSCache(50, func(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) {
t.Fatal("storage call")
return &types.TipSet{}, nil
})
tsc := newTSCache(50, &tsCacheAPIFailOnStorageCall{t: t})

h := abi.ChainEpoch(75)

Expand Down Expand Up @@ -91,7 +103,9 @@ func TestTsCacheNulls(t *testing.T) {
add()
add()

require.Equal(t, h-1, tsc.best().Height())
best, err := tsc.best()
require.NoError(t, err)
require.Equal(t, h-1, best.Height())

ts, err := tsc.get(h - 1)
require.NoError(t, err)
Expand All @@ -109,9 +123,17 @@ func TestTsCacheNulls(t *testing.T) {
require.NoError(t, err)
require.Equal(t, h-8, ts.Height())

require.NoError(t, tsc.revert(tsc.best()))
require.NoError(t, tsc.revert(tsc.best()))
require.Equal(t, h-8, tsc.best().Height())
best, err = tsc.best()
require.NoError(t, err)
require.NoError(t, tsc.revert(best))

best, err = tsc.best()
require.NoError(t, err)
require.NoError(t, tsc.revert(best))

best, err = tsc.best()
require.NoError(t, err)
require.Equal(t, h-8, best.Height())

h += 50
add()
Expand All @@ -120,3 +142,27 @@ func TestTsCacheNulls(t *testing.T) {
require.NoError(t, err)
require.Equal(t, h-1, ts.Height())
}

type tsCacheAPIStorageCallCounter struct {
t *testing.T
chainGetTipSetByHeight int
chainHead int
}

func (tc *tsCacheAPIStorageCallCounter) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
tc.chainGetTipSetByHeight++
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIStorageCallCounter) ChainHead(ctx context.Context) (*types.TipSet, error) {
tc.chainHead++
return &types.TipSet{}, nil
}

func TestTsCacheEmpty(t *testing.T) {
// Calling best on an empty cache should just call out to the chain API
callCounter := &tsCacheAPIStorageCallCounter{t: t}
tsc := newTSCache(50, callCounter)
_, err := tsc.best()
require.NoError(t, err)
require.Equal(t, 1, callCounter.chainHead)
}

0 comments on commit d3e5092

Please sign in to comment.