-
Notifications
You must be signed in to change notification settings - Fork 273
/
Copy pathbatch.go
128 lines (109 loc) · 3.53 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package iavl
import (
"sync"
corestore "cosmossdk.io/core/store"
)
// BatchWithFlusher is a wrapper
// around batch that flushes batch's data to disk
// as soon as the configurable limit is reached.
type BatchWithFlusher struct {
mtx sync.Mutex
db corestore.KVStoreWithBatch // This is only used to create new batch
batch corestore.Batch // Batched writing buffer.
flushThreshold int // The threshold to flush the batch to disk.
}
var _ corestore.Batch = (*BatchWithFlusher)(nil)
// NewBatchWithFlusher returns new BatchWithFlusher wrapping the passed in batch
func NewBatchWithFlusher(db corestore.KVStoreWithBatch, flushThreshold int) *BatchWithFlusher {
return &BatchWithFlusher{
db: db,
batch: db.NewBatchWithSize(flushThreshold),
flushThreshold: flushThreshold,
}
}
// estimateSizeAfterSetting estimates the batch's size after setting a key / value
func (b *BatchWithFlusher) estimateSizeAfterSetting(key []byte, value []byte) (int, error) {
currentSize, err := b.batch.GetByteSize()
if err != nil {
return 0, err
}
// for some batch implementation, when adding a key / value,
// the batch size could gain more than the total size of key and value,
// /~https://github.com/syndtr/goleveldb/blob/64ee5596c38af10edb6d93e1327b3ed1739747c7/leveldb/batch.go#L98
// we add 100 here just to over-account for that overhead
// since estimateSizeAfterSetting is only used to check if we exceed the threshold when setting a key / value
// this means we only over-account for the last key / value
return currentSize + len(key) + len(value) + 100, nil
}
// Set sets value at the given key to the db.
// If the set causes the underlying batch size to exceed flushThreshold,
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold.
// The addition entry is then added to the batch.
func (b *BatchWithFlusher) Set(key, value []byte) error {
b.mtx.Lock()
defer b.mtx.Unlock()
batchSizeAfter, err := b.estimateSizeAfterSetting(key, value)
if err != nil {
return err
}
if batchSizeAfter > b.flushThreshold {
b.mtx.Unlock()
if err := b.Write(); err != nil {
return err
}
b.mtx.Lock()
}
return b.batch.Set(key, value)
}
// Delete delete value at the given key to the db.
// If the deletion causes the underlying batch size to exceed batchSizeFlushThreshold,
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold.
// The deletion entry is then added to the batch.
func (b *BatchWithFlusher) Delete(key []byte) error {
b.mtx.Lock()
defer b.mtx.Unlock()
batchSizeAfter, err := b.estimateSizeAfterSetting(key, []byte{})
if err != nil {
return err
}
if batchSizeAfter > b.flushThreshold {
b.mtx.Unlock()
if err := b.Write(); err != nil {
return err
}
b.mtx.Lock()
}
return b.batch.Delete(key)
}
func (b *BatchWithFlusher) Write() error {
b.mtx.Lock()
defer b.mtx.Unlock()
if err := b.batch.Write(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
return nil
}
func (b *BatchWithFlusher) WriteSync() error {
b.mtx.Lock()
defer b.mtx.Unlock()
if err := b.batch.WriteSync(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
return nil
}
func (b *BatchWithFlusher) Close() error {
b.mtx.Lock()
defer b.mtx.Unlock()
return b.batch.Close()
}
func (b *BatchWithFlusher) GetByteSize() (int, error) {
return b.batch.GetByteSize()
}