Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GO-4242 fix spaceIndexer deadlock on stop #1669

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/block/editor/smartblock/smartblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ type Locker interface {
}

type Indexer interface {
Index(ctx context.Context, info DocInfo, options ...IndexOption) error
Index(info DocInfo, options ...IndexOption) error
app.ComponentRunnable
}

Expand Down Expand Up @@ -1296,7 +1296,7 @@ func (sb *smartBlock) getDocInfo(st *state.State) DocInfo {

func (sb *smartBlock) runIndexer(s *state.State, opts ...IndexOption) {
docInfo := sb.getDocInfo(s)
if err := sb.indexer.Index(context.Background(), docInfo, opts...); err != nil {
if err := sb.indexer.Index(docInfo, opts...); err != nil {
log.Errorf("index object %s error: %s", sb.Id(), err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Indexer interface {
ReindexMarketplaceSpace(space clientspace.Space) error
ReindexSpace(space clientspace.Space) error
RemoveIndexes(spaceId string) (err error)
Index(ctx context.Context, info smartblock.DocInfo, options ...smartblock.IndexOption) error
Index(info smartblock.DocInfo, options ...smartblock.IndexOption) error
app.ComponentRunnable
}

Expand Down Expand Up @@ -136,7 +136,7 @@ func (i *indexer) RemoveAclIndexes(spaceId string) (err error) {
return i.store.SpaceIndex(spaceId).DeleteDetails(i.runCtx, ids)
}

func (i *indexer) Index(ctx context.Context, info smartblock.DocInfo, options ...smartblock.IndexOption) error {
func (i *indexer) Index(info smartblock.DocInfo, options ...smartblock.IndexOption) error {
i.lock.Lock()
spaceInd, ok := i.spaceIndexers[info.Space.Id()]
if !ok {
Expand All @@ -145,5 +145,5 @@ func (i *indexer) Index(ctx context.Context, info smartblock.DocInfo, options ..
}
i.lock.Unlock()

return spaceInd.Index(ctx, info, options...)
return spaceInd.Index(info, options...)
}
6 changes: 3 additions & 3 deletions core/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestIndexer(t *testing.T) {
indexerFx.store.SpaceIndex("spaceId1").SaveLastIndexedHeadsHash(ctx, "objectId1", "7f40bc2814f5297818461f889780a870ea033fe64c5a261117f2b662515a3dba")

// when
err := indexerFx.Index(context.Background(), smartTest.GetDocInfo(), testCase.options)
err := indexerFx.Index(smartTest.GetDocInfo(), testCase.options)

// then
assert.NoError(t, err)
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestIndexer(t *testing.T) {
indexerFx.store.SpaceIndex("spaceId1").SaveLastIndexedHeadsHash(ctx, "objectId1", "randomHash")

// when
err := indexerFx.Index(context.Background(), smartTest.GetDocInfo(), testCase.options)
err := indexerFx.Index(smartTest.GetDocInfo(), testCase.options)

// then
assert.NoError(t, err)
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestIndexer(t *testing.T) {
indexerFx.store.SpaceIndex("spaceId1").SaveLastIndexedHeadsHash(ctx, "objectId1", "7f40bc2814f5297818461f889780a870ea033fe64c5a261117f2b662515a3dba")

// when
err := indexerFx.Index(context.Background(), smartTest.GetDocInfo())
err := indexerFx.Index(smartTest.GetDocInfo())

// then
assert.NoError(t, err)
Expand Down
11 changes: 8 additions & 3 deletions core/indexer/reindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,15 +480,20 @@ func (i *indexer) reindexOutdatedObjects(ctx context.Context, space clientspace.
return len(idsToReindex), success, nil
}

func (i *indexer) reindexDoc(ctx context.Context, space smartblock.Space, id string) error {
func (i *indexer) reindexDoc(space smartblock.Space, id string) error {
return space.Do(id, func(sb smartblock.SmartBlock) error {
return i.Index(ctx, sb.GetDocInfo())
return i.Index(sb.GetDocInfo())
})
}

func (i *indexer) reindexIdsIgnoreErr(ctx context.Context, space smartblock.Space, ids ...string) (successfullyReindexed int) {
for _, id := range ids {
err := i.reindexDoc(ctx, space, id)
select {
case <-ctx.Done():
return
default:
}
err := i.reindexDoc(space, id)
if err != nil {
log.With("objectID", id).Errorf("failed to reindex: %v", err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions core/indexer/spaceindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func (i *spaceIndexer) indexBatch(tasks []indexTask) (err error) {
return
}

func (i *spaceIndexer) Index(ctx context.Context, info smartblock.DocInfo, options ...smartblock.IndexOption) error {
func (i *spaceIndexer) Index(info smartblock.DocInfo, options ...smartblock.IndexOption) error {
done := make(chan error)
if err := i.batcher.Add(ctx, indexTask{
if err := i.batcher.Add(i.runCtx, indexTask{
info: info,
options: options,
done: done,
Expand Down
Loading