Skip to content

Commit

Permalink
WaitAsyncCommit
Browse files Browse the repository at this point in the history
  • Loading branch information
yihuang committed May 16, 2023
1 parent b01e2d9 commit a2183b0
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 33 deletions.
2 changes: 1 addition & 1 deletion app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type MemIAVLConfig struct {
// the zero-copied slices must not be retained beyond current block's execution.
ZeroCopy bool `mapstructure:"zero-copy"`
// AsyncCommitBuffer defines the size of asynchronous commit queue, this greatly improve block catching-up
// performance, 0 means synchronous commit.
// performance, -1 means synchronous commit.
AsyncCommitBuffer int `mapstructure:"async-commit-buffer"`
}

Expand Down
2 changes: 1 addition & 1 deletion app/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ enable = {{ .MemIAVL.Enable }}
zero-copy = {{ .MemIAVL.ZeroCopy }}
# AsyncCommitBuffer defines the size of asynchronous commit queue, this greatly improve block catching-up
# performance, 0 means synchronous commit.
# performance, -1 means synchronous commit.
async-commit-buffer = {{ .MemIAVL.AsyncCommitBuffer }}
`
83 changes: 52 additions & 31 deletions memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type DB struct {
pruneSnapshotLock sync.Mutex

// invariant: the LastIndex always match the current version of MultiTree
wal *wal.Log
walChan chan *walEntry
walQuit chan error
wal *wal.Log
walChanSize int
walChan chan *walEntry
walQuit chan error

// pending store upgrades, will be written into WAL in next Commit call
pendingUpgrades []*TreeNameUpgrade
Expand All @@ -63,7 +64,7 @@ type Options struct {
SnapshotKeepRecent uint32
// load the target version instead of latest version
TargetVersion uint32
// Buffer size for the asynchronous commit queue, 0 means synchronous commit
// Buffer size for the asynchronous commit queue, -1 means synchronous commit
AsyncCommitBuffer int
// ZeroCopy if true, the get and iterator methods could return a slice pointing to mmaped blob files.
ZeroCopy bool
Expand Down Expand Up @@ -97,36 +98,11 @@ func Load(dir string, opts Options) (*DB, error) {
return nil, err
}

var (
walChan chan *walEntry
walQuit chan error
)
if opts.AsyncCommitBuffer > 0 {
walChan = make(chan *walEntry, opts.AsyncCommitBuffer)
walQuit = make(chan error)
go func() {
defer close(walQuit)

for entry := range walChan {
bz, err := entry.data.Marshal()
if err != nil {
walQuit <- err
return
}
if err := wal.Write(entry.index, bz); err != nil {
walQuit <- err
return
}
}
}()
}

db := &DB{
MultiTree: *mtree,
dir: dir,
wal: wal,
walChan: walChan,
walQuit: walQuit,
walChanSize: opts.AsyncCommitBuffer,
snapshotKeepRecent: opts.SnapshotKeepRecent,
}

Expand Down Expand Up @@ -269,7 +245,11 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {
Changesets: changeSets,
Upgrades: db.pendingUpgrades,
}}
if db.walChan != nil {
if db.walChanSize >= 0 {
if db.walChan == nil {
db.initAsyncCommit()
}

// async wal writing
db.walChan <- &entry
} else {
Expand All @@ -288,6 +268,47 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {
return hash, v, nil
}

func (db *DB) initAsyncCommit() {
walChan := make(chan *walEntry, db.walChanSize)
walQuit := make(chan error)

go func() {
defer close(walQuit)

for entry := range walChan {
bz, err := entry.data.Marshal()
if err != nil {
walQuit <- err
return
}
if err := db.wal.Write(entry.index, bz); err != nil {
walQuit <- err
return
}
}
}()

db.walChan = walChan
db.walQuit = walQuit
}

// WaitAsyncCommit waits for the completion of async commit
func (db *DB) WaitAsyncCommit() error {
db.mtx.Lock()
defer db.mtx.Unlock()

if db.walChan == nil {
return nil
}

close(db.walChan)
err := <-db.walQuit

db.walChan = nil
db.walQuit = nil
return err
}

func (db *DB) Copy() *DB {
db.mtx.Lock()
defer db.mtx.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (rs *Store) Commit() types.CommitID {
return rs.lastCommitInfo.CommitID()
}

func (rs *Store) WaitAsyncCommit() error {
return rs.db.WaitAsyncCommit()
}

// Implements interface Committer
func (rs *Store) LastCommitID() types.CommitID {
return rs.lastCommitInfo.CommitID()
Expand Down

0 comments on commit a2183b0

Please sign in to comment.