-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgatherer.go
60 lines (52 loc) · 1.64 KB
/
gatherer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package taskgroup
import "sync"
// A Gatherer manages a group of [Task] functions that report values, and
// gathers the values they return.
type Gatherer[T any] struct {
run func(Task) // start the task in a goroutine
μ sync.Mutex
gather func(T) // handle values reported by tasks
}
func (g *Gatherer[T]) report(v T) {
g.μ.Lock()
defer g.μ.Unlock()
g.gather(v)
}
// Gather creates a new empty gatherer that uses run to execute tasks returning
// values of type T.
//
// If gather != nil, values reported by successful tasks are passed to the
// function, otherwise such values are discarded. Calls to gather are
// synchronized to a single goroutine.
//
// If run == nil, Gather will panic.
func Gather[T any](run func(Task), gather func(T)) *Gatherer[T] {
if run == nil {
panic("run function is nil")
}
if gather == nil {
gather = func(T) {}
}
return &Gatherer[T]{run: run, gather: gather}
}
// Call runs f in g. If f reports an error, the error is propagated to the
// runner; otherwise the non-error value reported by f is gathered.
func (g *Gatherer[T]) Call(f func() (T, error)) {
g.run(func() error {
v, err := f()
if err == nil {
g.report(v)
}
return err
})
}
// Run runs f in g, and gathers the value it reports.
func (g *Gatherer[T]) Run(f func() T) {
g.run(func() error { g.report(f()); return nil })
}
// Report runs f in g. Any values passed to report are gathered. If f reports
// an error, that error is propagated to the runner. Any values sent before f
// returns are still gathered, even if f reports an error.
func (g *Gatherer[T]) Report(f func(report func(T)) error) {
g.run(func() error { return f(g.report) })
}