Skip to content

Commit

Permalink
Merge pull request #1534 from anyproto/GO-4012-indexer-tx
Browse files Browse the repository at this point in the history
GO-4012 indexer: batch tx
  • Loading branch information
cheggaaa authored Sep 9, 2024
2 parents 6278134 + d7a9d3c commit 6713d40
Show file tree
Hide file tree
Showing 20 changed files with 394 additions and 189 deletions.
10 changes: 5 additions & 5 deletions core/block/export/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Test_docsForExport(t *testing.T) {
bundle.RelationKeyName: pbtypes.String("name2"),
},
})
err := storeFixture.UpdateObjectLinks("id", []string{"id1"})
err := storeFixture.UpdateObjectLinks(context.Background(), "id", []string{"id1"})
assert.Nil(t, err)

provider := mock_typeprovider.NewMockSmartBlockTypeProvider(t)
Expand Down Expand Up @@ -95,7 +95,7 @@ func Test_docsForExport(t *testing.T) {
bundle.RelationKeyIsDeleted: pbtypes.Bool(true),
},
})
err := storeFixture.UpdateObjectLinks("id", []string{"id1"})
err := storeFixture.UpdateObjectLinks(context.Background(), "id", []string{"id1"})
assert.Nil(t, err)

provider := mock_typeprovider.NewMockSmartBlockTypeProvider(t)
Expand Down Expand Up @@ -127,7 +127,7 @@ func Test_docsForExport(t *testing.T) {
bundle.RelationKeyType: pbtypes.String("objectType"),
},
})
err := storeFixture.UpdateObjectLinks("id", []string{"id1"})
err := storeFixture.UpdateObjectLinks(context.Background(), "id", []string{"id1"})
assert.Nil(t, err)

objectGetter := mock_cache.NewMockObjectGetter(t)
Expand Down Expand Up @@ -184,7 +184,7 @@ func Test_docsForExport(t *testing.T) {
},
})

err = storeFixture.UpdateObjectLinks("id", []string{"id1"})
err = storeFixture.UpdateObjectLinks(context.Background(), "id", []string{"id1"})
assert.Nil(t, err)

objectGetter := mock_cache.NewMockObjectGetter(t)
Expand Down Expand Up @@ -404,7 +404,7 @@ func Test_docsForExport(t *testing.T) {
},
})

err = storeFixture.UpdateObjectLinks(templateId, []string{linkedObjectId})
err = storeFixture.UpdateObjectLinks(context.Background(), templateId, []string{linkedObjectId})
assert.Nil(t, err)

objectGetter := mock_cache.NewMockObjectGetter(t)
Expand Down
12 changes: 2 additions & 10 deletions core/indexer/fulltext.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,7 @@ func (i *indexer) ForceFTIndex() {
// MUST NOT be called more than once
func (i *indexer) ftLoopRoutine() {
ticker := time.NewTicker(ftIndexInterval)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-i.quit:
cancel()
case <-ctx.Done():
}
}()
ctx := i.runCtx

i.runFullTextIndexer(ctx)
defer close(i.ftQueueFinished)
Expand Down Expand Up @@ -229,7 +221,7 @@ func (i *indexer) ftInit() error {
return err
}
for _, id := range ids {
if err := i.store.AddToIndexQueue(id); err != nil {
if err := i.store.AddToIndexQueue(i.runCtx, id); err != nil {
return err
}
}
Expand Down
12 changes: 7 additions & 5 deletions core/indexer/fulltext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/anyproto/any-sync/app"
"github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -85,10 +86,11 @@ func NewIndexerFixture(t *testing.T) *IndexerFixture {
indexerFx.ftsearch = indxr.ftsearch
indexerFx.pickerFx = mock_cache.NewMockObjectGetter(t)
indxr.picker = indexerFx.pickerFx
indxr.quit = make(chan struct{})
indxr.batcher = mb.New[indexTask](100)
indxr.forceFt = make(chan struct{})
indxr.config = &config.Config{NetworkMode: pb.RpcAccount_LocalOnly}

indxr.runCtx, indxr.runCtxCancel = context.WithCancel(ctx)
go indxr.indexBatchLoop()
return indexerFx
}

Expand Down Expand Up @@ -326,7 +328,7 @@ func TestRunFullTextIndexer(t *testing.T) {
blockbuilder.ID("blockId1"),
),
)))
indexerFx.store.AddToIndexQueue("objectId" + strconv.Itoa(i))
indexerFx.store.AddToIndexQueue(context.Background(), "objectId"+strconv.Itoa(i))
indexerFx.pickerFx.EXPECT().GetObject(mock.Anything, "objectId"+strconv.Itoa(i)).Return(smartTest, nil).Once()
}

Expand All @@ -352,7 +354,7 @@ func TestRunFullTextIndexer(t *testing.T) {
),
)))
indexerFx.pickerFx.EXPECT().GetObject(mock.Anything, "objectId"+strconv.Itoa(i)).Return(smartTest, nil).Once()
indexerFx.store.AddToIndexQueue("objectId" + strconv.Itoa(i))
indexerFx.store.AddToIndexQueue(context.Background(), "objectId"+strconv.Itoa(i))

}

Expand Down Expand Up @@ -381,7 +383,7 @@ func TestPrepareSearchDocument_Reindex_Removed(t *testing.T) {
blockbuilder.ID("blockId1"),
),
)))
indexerFx.store.AddToIndexQueue("objectId1")
indexerFx.store.AddToIndexQueue(context.Background(), "objectId1")
indexerFx.pickerFx.EXPECT().GetObject(mock.Anything, mock.Anything).Return(smartTest, nil)
indexerFx.runFullTextIndexer(context.Background())

Expand Down
104 changes: 92 additions & 12 deletions core/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
"golang.org/x/exp/slices"

Expand Down Expand Up @@ -54,15 +55,23 @@ type Hasher interface {
Hash() string
}

type indexTask struct {
info smartblock.DocInfo
options []smartblock.IndexOption
done chan error
}

type indexer struct {
store objectstore.ObjectStore
fileStore filestore.FileStore
source source.Service
picker cache.ObjectGetter
ftsearch ftsearch.FTSearch
storageService storage.ClientStorage
batcher *mb.MB[indexTask]

quit chan struct{}
runCtx context.Context
runCtxCancel context.CancelFunc
ftQueueFinished chan struct{}
config *config.Config

Expand All @@ -81,10 +90,11 @@ func (i *indexer) Init(a *app.App) (err error) {
i.fileStore = app.MustComponent[filestore.FileStore](a)
i.ftsearch = app.MustComponent[ftsearch.FTSearch](a)
i.picker = app.MustComponent[cache.ObjectGetter](a)
i.quit = make(chan struct{})
i.ftQueueFinished = make(chan struct{})
i.runCtx, i.runCtxCancel = context.WithCancel(context.Background())
i.forceFt = make(chan struct{})
i.config = app.MustComponent[*config.Config](a)
i.batcher = mb.New[indexTask](100)
go i.indexBatchLoop()
return
}

Expand All @@ -100,14 +110,71 @@ func (i *indexer) StartFullTextIndex() (err error) {
if ftErr := i.ftInit(); ftErr != nil {
log.Errorf("can't init ft: %v", ftErr)
}
i.ftQueueFinished = make(chan struct{})
go i.ftLoopRoutine()
return
}

func (i *indexer) indexBatchLoop() {
for {
tasks, err := i.batcher.Wait(i.runCtx)
if err != nil {
return
}
if iErr := i.indexBatch(tasks); iErr != nil {
log.Warnf("indexBatch error: %v", iErr)
}
}
}

func (i *indexer) indexBatch(tasks []indexTask) (err error) {
tx, err := i.store.WriteTx(i.runCtx)
if err != nil {
return err
}
st := time.Now()

closeTasks := func(closeErr error) {
for _, t := range tasks {
if closeErr != nil {
select {
case t.done <- closeErr:
default:
}
} else {
close(t.done)
}
}
}

defer func() {
if err != nil {
_ = tx.Rollback()

Check failure on line 152 in core/indexer/indexer.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `tx.Rollback` is not checked (errcheck)
} else {
if err = tx.Commit(); err != nil {
closeTasks(err)
} else {
closeTasks(nil)
}
log.Infof("indexBatch: indexed %d docs for a %v: err: %v", len(tasks), time.Since(st), err)
}
}()

for _, task := range tasks {
if iErr := i.index(tx.Context(), task.info, task.options...); iErr != nil {
task.done <- iErr
}
}
return
}

func (i *indexer) Close(ctx context.Context) (err error) {
close(i.quit)
// we need to wait for the ftQueue processing to be finished gracefully. Because we may be in the middle of badger transaction
<-i.ftQueueFinished
_ = i.batcher.Close()
if i.runCtxCancel != nil {
i.runCtxCancel()
// we need to wait for the ftQueue processing to be finished gracefully. Because we may be in the middle of badger transaction
<-i.ftQueueFinished
}
return nil
}

Expand All @@ -129,10 +196,23 @@ func (i *indexer) RemoveAclIndexes(spaceId string) (err error) {
if err != nil {
return
}
return i.store.DeleteDetails(ids...)
return i.store.DeleteDetails(i.runCtx, ids...)
}

func (i *indexer) Index(ctx context.Context, info smartblock.DocInfo, options ...smartblock.IndexOption) error {
done := make(chan error)
if err := i.batcher.Add(ctx, indexTask{
info: info,
options: options,
done: done,
}); err != nil {
return err
}
err, _ := <-done
return err
}

func (i *indexer) index(ctx context.Context, info smartblock.DocInfo, options ...smartblock.IndexOption) error {
// options are stored in smartblock pkg because of cyclic dependency :(
startTime := time.Now()
opts := &smartblock.IndexOptions{}
Expand All @@ -150,7 +230,7 @@ func (i *indexer) Index(ctx context.Context, info smartblock.DocInfo, options ..
return
}

err = i.store.SaveLastIndexedHeadsHash(info.Id, headHashToIndex)
err = i.store.SaveLastIndexedHeadsHash(ctx, info.Id, headHashToIndex)
if err != nil {
log.With("objectID", info.Id).Errorf("failed to save indexed heads hash: %v", err)
}
Expand All @@ -161,7 +241,7 @@ func (i *indexer) Index(ctx context.Context, info smartblock.DocInfo, options ..
return nil
}

lastIndexedHash, err := i.store.GetLastIndexedHeadsHash(info.Id)
lastIndexedHash, err := i.store.GetLastIndexedHeadsHash(ctx, info.Id)
if err != nil {
log.With("object", info.Id).Errorf("failed to get last indexed heads hash: %v", err)
}
Expand All @@ -180,7 +260,7 @@ func (i *indexer) Index(ctx context.Context, info smartblock.DocInfo, options ..
indexSetTime := time.Now()
var hasError bool
if indexLinks {
if err = i.store.UpdateObjectLinks(info.Id, info.Links); err != nil {
if err = i.store.UpdateObjectLinks(ctx, info.Id, info.Links); err != nil {
hasError = true
log.With("objectID", info.Id).Errorf("failed to save object links: %v", err)
}
Expand Down Expand Up @@ -208,12 +288,12 @@ func (i *indexer) Index(ctx context.Context, info smartblock.DocInfo, options ..
}

if !(opts.SkipFullTextIfHeadsNotChanged && lastIndexedHash == headHashToIndex) {
if err := i.store.AddToIndexQueue(info.Id); err != nil {
if err := i.store.AddToIndexQueue(ctx, info.Id); err != nil {
log.With("objectID", info.Id).Errorf("can't add id to index queue: %v", err)
}
}
} else {
_ = i.store.DeleteDetails(info.Id)
_ = i.store.DeleteDetails(ctx, info.Id)

Check failure on line 296 in core/indexer/indexer.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `i.store.DeleteDetails` is not checked (errcheck)
}
indexDetailsTime := time.Now()
detailsCount := 0
Expand Down
8 changes: 5 additions & 3 deletions core/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/anyproto/anytype-heart/tests/testutil"
)

var ctx = context.Background()

func TestIndexer(t *testing.T) {
for _, testCase := range []struct {
name string
Expand Down Expand Up @@ -44,7 +46,7 @@ func TestIndexer(t *testing.T) {

smartTest.SetType(coresb.SmartBlockTypePage)
indexerFx.storageServiceFx.EXPECT().BindSpaceID(mock.Anything, mock.Anything).Return(nil)
indexerFx.store.SaveLastIndexedHeadsHash("objectId1", "7f40bc2814f5297818461f889780a870ea033fe64c5a261117f2b662515a3dba")
indexerFx.store.SaveLastIndexedHeadsHash(ctx, "objectId1", "7f40bc2814f5297818461f889780a870ea033fe64c5a261117f2b662515a3dba")

// when
err := indexerFx.Index(context.Background(), smartTest.GetDocInfo(), testCase.options)
Expand All @@ -71,7 +73,7 @@ func TestIndexer(t *testing.T) {

smartTest.SetType(coresb.SmartBlockTypePage)
indexerFx.storageServiceFx.EXPECT().BindSpaceID(mock.Anything, mock.Anything).Return(nil)
indexerFx.store.SaveLastIndexedHeadsHash("objectId1", "randomHash")
indexerFx.store.SaveLastIndexedHeadsHash(ctx, "objectId1", "randomHash")

// when
err := indexerFx.Index(context.Background(), smartTest.GetDocInfo(), testCase.options)
Expand Down Expand Up @@ -99,7 +101,7 @@ func TestIndexer(t *testing.T) {

smartTest.SetType(coresb.SmartBlockTypePage)
indexerFx.storageServiceFx.EXPECT().BindSpaceID(mock.Anything, mock.Anything).Return(nil)
indexerFx.store.SaveLastIndexedHeadsHash("objectId1", "7f40bc2814f5297818461f889780a870ea033fe64c5a261117f2b662515a3dba")
indexerFx.store.SaveLastIndexedHeadsHash(ctx, "objectId1", "7f40bc2814f5297818461f889780a870ea033fe64c5a261117f2b662515a3dba")

// when
err := indexerFx.Index(context.Background(), smartTest.GetDocInfo())
Expand Down
8 changes: 4 additions & 4 deletions core/indexer/reindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (i *indexer) removeOldFiles(spaceId string, flags reindexFlags) error {
}
for _, id := range ids {
if domain.IsFileId(id) {
err = i.store.DeleteDetails(id)
err = i.store.DeleteDetails(i.runCtx, id)
if err != nil {
log.Errorf("delete old file %s: %s", id, err)
}
Expand Down Expand Up @@ -382,7 +382,7 @@ func (i *indexer) removeDetails(spaceId string) error {
log.Errorf("reindex failed to get all ids(removeAllIndexedObjects): %v", err)
}
for _, id := range ids {
if err = i.store.DeleteDetails(id); err != nil {
if err = i.store.DeleteDetails(i.runCtx, id); err != nil {
log.Errorf("reindex failed to delete details(removeAllIndexedObjects): %v", err)
}
}
Expand Down Expand Up @@ -412,7 +412,7 @@ func (i *indexer) removeOldObjects() (err error) {
return
}

err = i.store.DeleteDetails(ids...)
err = i.store.DeleteDetails(i.runCtx, ids...)
log.With(zap.Int("count", len(ids)), zap.Error(err)).Warnf("removeOldObjects")
return err
}
Expand Down Expand Up @@ -484,7 +484,7 @@ func (i *indexer) reindexOutdatedObjects(ctx context.Context, space clientspace.
log.With("tree", tid).Errorf("reindexOutdatedObjects failed to get tree to reindex: %s", err)
}

lastHash, err := i.store.GetLastIndexedHeadsHash(tid)
lastHash, err := i.store.GetLastIndexedHeadsHash(ctx, tid)
if err != nil {
logErr(err)
continue
Expand Down
Loading

0 comments on commit 6713d40

Please sign in to comment.