-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfig.go
99 lines (80 loc) · 2.39 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package batchify
import (
"time"
"github.com/samber/go-batchify/pkg/hasher"
"github.com/samber/lo"
)
func assertValue(ok bool, msg string) {
if !ok {
panic(msg)
}
}
// BatchConfig is a builder for Batch.
func NewBatchConfig[I comparable, O any](bufferSize int, do func([]I) (map[I]O, error)) BatchConfig[I, O] {
assertValue(bufferSize >= 1, "buffer size must be a positive value")
return BatchConfig[I, O]{
bufferSize: bufferSize,
do: do,
}
}
type BatchConfig[I comparable, O any] struct {
bufferSize int
do func([]I) (map[I]O, error)
// max buffer duration
ttl time.Duration
shards int
shardingFn hasher.Hasher[I]
}
// WithTimer sets the max time for a batch buffer
func (cfg BatchConfig[I, O]) WithTimer(ttl time.Duration) BatchConfig[I, O] {
assertValue(ttl >= 0, "ttl must be a positive value")
cfg.ttl = ttl
return cfg
}
// WithSharding enables cache sharding.
func (cfg BatchConfig[I, O]) WithSharding(nbr int, fn hasher.Hasher[I]) BatchConfig[I, O] {
assertValue(nbr > 1, "shards must be greater than 1")
assertValue(fn != nil, "hasher must be greater not nil")
cfg.shards = nbr
cfg.shardingFn = fn
return cfg
}
// Build creates a new Batch instance.
func (cfg BatchConfig[I, O]) Build() Batch[I, O] {
build := func(_ int) Batch[I, O] {
return newBatch(
cfg.bufferSize,
cfg.ttl,
cfg.do,
)
}
if cfg.shards > 1 {
batches := lo.RepeatBy(cfg.shards, build)
return newShardedBatch(batches, cfg.shardingFn)
}
return build(0)
}
/**
* Shortcuts
*/
// NewBatch creates a new Batch instance with fixed size and no timer.
func NewBatch[I comparable, O any](bufferSize int, do func([]I) (map[I]O, error)) Batch[I, O] {
return NewBatchConfig(bufferSize, do).
Build()
}
func NewBatchWithTimer[I comparable, O any](bufferSize int, do func([]I) (map[I]O, error), ttl time.Duration) Batch[I, O] {
return NewBatchConfig(bufferSize, do).
WithTimer(ttl).
Build()
}
func NewShardedBatch[I comparable, O any](shards int, hasher hasher.Hasher[I], bufferSize int, do func([]I) (map[I]O, error)) Batch[I, O] {
return NewBatchConfig(bufferSize, do).
WithSharding(shards, hasher).
Build()
}
func NewShardedBatchWithTimer[I comparable, O any](shards int, hasher hasher.Hasher[I], bufferSize int, do func([]I) (map[I]O, error), ttl time.Duration) Batch[I, O] {
return NewBatchConfig(bufferSize, do).
WithTimer(ttl).
WithSharding(shards, hasher).
Build()
}