From 062c6f2b1a56d93f56f97c3c4e232909f3b4c5a4 Mon Sep 17 00:00:00 2001 From: Sergey Date: Mon, 5 Jun 2023 20:54:51 +0500 Subject: [PATCH] GO-1465: Fix uploading logic - Fix annoying Limit reached event - Fix indexing issues when user's file limit is reached AND account freshly recovered with empty data folder --- core/files/files.go | 8 +- core/filestorage/filesync/filesync.go | 2 +- core/filestorage/filesync/filesync_test.go | 2 +- core/filestorage/filesync/filesyncstore.go | 19 +++++ core/filestorage/filesync/upload.go | 95 ++++++++++++++-------- 5 files changed, 90 insertions(+), 36 deletions(-) diff --git a/core/files/files.go b/core/files/files.go index 18820fd0ae..b11be30807 100644 --- a/core/files/files.go +++ b/core/files/files.go @@ -394,7 +394,7 @@ func (s *service) fileIndexLink(ctx context.Context, inode ipld.Node, fileID str if err := s.fileStore.AddTarget(linkID, fileID); err != nil { return fmt.Errorf("add target to %s: %w", linkID, err) } - if err := s.AddToSyncQueue(fileID); err != nil { + if err := s.addToSyncQueue(fileID, true); err != nil { return fmt.Errorf("add file %s to sync queue: %w", fileID, err) } return nil @@ -806,9 +806,13 @@ func (s *service) fileIndexInfo(ctx context.Context, hash string, updateIfExists } func (s *service) AddToSyncQueue(fileID string) error { + return s.addToSyncQueue(fileID, false) +} + +func (s *service) addToSyncQueue(fileID string, uploadedByUser bool) error { spaceID := s.spaceService.AccountId() - if err := s.fileSync.AddFile(spaceID, fileID); err != nil { + if err := s.fileSync.AddFile(spaceID, fileID, uploadedByUser); err != nil { return fmt.Errorf("add file to sync queue: %w", err) } if _, err := s.syncStatusWatcher.Watch(fileID, nil); err != nil { diff --git a/core/filestorage/filesync/filesync.go b/core/filestorage/filesync/filesync.go index c647240262..acb35c64a4 100644 --- a/core/filestorage/filesync/filesync.go +++ b/core/filestorage/filesync/filesync.go @@ -29,7 +29,7 @@ var errReachedLimit = fmt.Errorf("file upload limit has been reached") //go:generate mockgen -package mock_filesync -destination ./mock_filesync/filesync_mock.go github.com/anyproto/anytype-heart/core/filestorage/filesync FileSync type FileSync interface { - AddFile(spaceId, fileId string) (err error) + AddFile(spaceId, fileId string, uploadedByUser bool) (err error) RemoveFile(spaceId, fileId string) (err error) SpaceStat(ctx context.Context, spaceId string) (ss SpaceStat, err error) FileStat(ctx context.Context, spaceId, fileId string) (fs FileStat, err error) diff --git a/core/filestorage/filesync/filesync_test.go b/core/filestorage/filesync/filesync_test.go index 1acb19ae4e..f7c38201d6 100644 --- a/core/filestorage/filesync/filesync_test.go +++ b/core/filestorage/filesync/filesync_test.go @@ -64,7 +64,7 @@ func TestFileSync_AddFile(t *testing.T) { fx.rpcStore.EXPECT().BindCids(gomock.Any(), spaceId, fileId, gomock.Any()).Return(nil) fx.rpcStore.EXPECT().SpaceInfo(gomock.Any(), spaceId).Return(&fileproto.SpaceInfoResponse{LimitBytes: 2 * 1024 * 1024}, nil).AnyTimes() fx.rpcStore.EXPECT().AddToFile(gomock.Any(), spaceId, fileId, gomock.Any()).AnyTimes() - require.NoError(t, fx.AddFile(spaceId, fileId)) + require.NoError(t, fx.AddFile(spaceId, fileId, false)) fx.waitEmptyQueue(t, time.Second*5) } diff --git a/core/filestorage/filesync/filesyncstore.go b/core/filestorage/filesync/filesyncstore.go index 57ca5ccef3..961fc9bb9e 100644 --- a/core/filestorage/filesync/filesyncstore.go +++ b/core/filestorage/filesync/filesyncstore.go @@ -109,6 +109,25 @@ func isKeyExists(txn *badger.Txn, key []byte) (bool, error) { return true, nil } +func (s *fileSyncStore) isFileQueued(spaceId, fileId string) (ok bool, err error) { + err = s.db.View(func(txn *badger.Txn) error { + ok, err = isKeyExists(txn, uploadKey(spaceId, fileId)) + if err != nil { + return fmt.Errorf("check upload key: %w", err) + } + if ok { + return nil + } + + ok, err = isKeyExists(txn, discardedKey(spaceId, fileId)) + if err != nil { + return fmt.Errorf("check discarded key: %w", err) + } + return nil + }) + return +} + func (s *fileSyncStore) HasUpload(spaceId, fileId string) (ok bool, err error) { err = s.db.View(func(txn *badger.Txn) error { ok, err = isKeyExists(txn, uploadKey(spaceId, fileId)) diff --git a/core/filestorage/filesync/upload.go b/core/filestorage/filesync/upload.go index c4c3f30034..59a62ddac8 100644 --- a/core/filestorage/filesync/upload.go +++ b/core/filestorage/filesync/upload.go @@ -21,7 +21,7 @@ import ( "github.com/anyproto/anytype-heart/pkg/lib/localstore" ) -func (f *fileSync) AddFile(spaceId, fileId string) (err error) { +func (f *fileSync) AddFile(spaceId, fileId string, uploadedByUser bool) (err error) { status, err := f.fileStore.GetSyncStatus(fileId) if err != nil && !errors.Is(err, localstore.ErrNotFound) { return fmt.Errorf("get file sync status: %w", err) @@ -38,6 +38,13 @@ func (f *fileSync) AddFile(spaceId, fileId string) (err error) { return nil } + if uploadedByUser { + err = f.checkAndNotifyAboutUploadLimits(spaceId, fileId) + if err != nil { + return fmt.Errorf("check upload limit: %w", err) + } + } + log.Info("add file to uploading queue", zap.String("fileID", fileId)) defer func() { if err == nil { @@ -51,6 +58,20 @@ func (f *fileSync) AddFile(spaceId, fileId string) (err error) { return } +func (f *fileSync) checkAndNotifyAboutUploadLimits(spaceId string, fileId string) error { + ok, err := f.queue.isFileQueued(spaceId, fileId) + if err != nil { + return fmt.Errorf("check if file is queued: %w", err) + } + if !ok { + _, err = f.prepareToUpload(context.Background(), spaceId, fileId) + if isLimitReachedErr(err) { + f.sendLimitReachedEvent(spaceId, fileId) + } + } + return nil +} + func (f *fileSync) addLoop() { f.addOperation() for { @@ -77,17 +98,17 @@ func (f *fileSync) addOperation() { } } -func (f *fileSync) getUpload() (spaceId string, fileId string, wasDiscarded bool, err error) { +func (f *fileSync) getUpload() (spaceId string, fileId string, err error) { spaceId, fileId, err = f.queue.GetUpload() if err == errQueueIsEmpty { spaceId, fileId, err = f.queue.GetDiscardedUpload() - return spaceId, fileId, true, err + return spaceId, fileId, err } - return spaceId, fileId, false, err + return spaceId, fileId, err } func (f *fileSync) tryToUpload() (string, error) { - spaceId, fileId, wasDiscarded, err := f.getUpload() + spaceId, fileId, err := f.getUpload() if err != nil { return fileId, err } @@ -100,10 +121,7 @@ func (f *fileSync) tryToUpload() (string, error) { return fileId, f.queue.DoneUpload(spaceId, fileId) } if err = f.uploadFile(f.loopCtx, spaceId, fileId); err != nil { - if errors.Is(err, errReachedLimit) || strings.Contains(err.Error(), fileprotoerr.ErrSpaceLimitExceeded.Error()) { - if !wasDiscarded { - f.sendLimitReachedEvent(spaceId, fileId) - } + if isLimitReachedErr(err) { log.Info("reached limit, push to discarded queue", zap.String("fileId", fileId)) if qerr := f.queue.QueueDiscarded(spaceId, fileId); qerr != nil { log.Warn("can't push upload task to discarded queue", zap.String("fileId", fileId), zap.Error(qerr)) @@ -124,6 +142,10 @@ func (f *fileSync) tryToUpload() (string, error) { return fileId, f.queue.DoneUpload(spaceId, fileId) } +func isLimitReachedErr(err error) bool { + return errors.Is(err, errReachedLimit) || strings.Contains(err.Error(), fileprotoerr.ErrSpaceLimitExceeded.Error()) +} + func (f *fileSync) uploadFile(ctx context.Context, spaceId, fileId string) (err error) { log.Info("uploading file", zap.String("fileId", fileId)) @@ -136,30 +158,9 @@ func (f *fileSync) uploadFile(ctx context.Context, spaceId, fileId string) (err _ = batcher.Close() }() - fileBlocks, err := f.collectFileBlocks(ctx, fileId) + blocksToUpload, err := f.prepareToUpload(ctx, spaceId, fileId) if err != nil { - return fmt.Errorf("collect file blocks: %w", err) - } - - bytesToUpload, blocksToUpload, err := f.selectBlocksToUploadAndBindExisting(ctx, spaceId, fileId, fileBlocks) - if err != nil { - return fmt.Errorf("select blocks to upload: %w", err) - } - - log.Info("collecting blocks to upload", - zap.String("fileID", fileId), - zap.Int("blocksToUpload", len(blocksToUpload)), - zap.Int("totalBlocks", len(fileBlocks)), - ) - - stat, err := f.SpaceStat(ctx, spaceId) - if err != nil { - return fmt.Errorf("get space stat: %w", err) - } - - bytesLeft := stat.BytesLimit - stat.BytesUsage - if bytesToUpload > bytesLeft { - return errReachedLimit + return err } go func() { @@ -194,6 +195,36 @@ func (f *fileSync) uploadFile(ctx context.Context, spaceId, fileId string) (err return <-dagErr } +func (f *fileSync) prepareToUpload(ctx context.Context, spaceId string, fileId string) ([]blocks.Block, error) { + fileBlocks, err := f.collectFileBlocks(ctx, fileId) + if err != nil { + return nil, fmt.Errorf("collect file blocks: %w", err) + } + + bytesToUpload, blocksToUpload, err := f.selectBlocksToUploadAndBindExisting(ctx, spaceId, fileId, fileBlocks) + if err != nil { + return nil, fmt.Errorf("select blocks to upload: %w", err) + } + + log.Info("collecting blocks to upload", + zap.String("fileID", fileId), + zap.Int("blocksToUpload", len(blocksToUpload)), + zap.Int("totalBlocks", len(fileBlocks)), + ) + + stat, err := f.SpaceStat(ctx, spaceId) + if err != nil { + return nil, fmt.Errorf("get space stat: %w", err) + } + + bytesLeft := stat.BytesLimit - stat.BytesUsage + if len(blocksToUpload) > 0 && bytesToUpload > bytesLeft { + return nil, errReachedLimit + } + + return blocksToUpload, nil +} + func (f *fileSync) hasFileInStore(fileID string) (bool, error) { roots, err := f.fileStore.ListByTarget(fileID) if err != localstore.ErrNotFound && err != nil {