-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathfunc_reduce.go
43 lines (35 loc) · 915 Bytes
/
func_reduce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package fgbase
import ()
// FuncReduce reduces a stream of data into a single empty interface.
func FuncReduce(a, x Edge, reducer func(n *Node, datum, collection interface{}) interface{}, freerun bool) Node {
var reduceFreerunRdy = func(n *Node) bool {
a := n.Srcs[0]
x := n.Dsts[0]
return a.SrcRdy(n) || x.DstRdy(n)
}
var reduceFreerunFire = func(n *Node) error {
if a.SrcRdy(n) {
n.Aux = reducer(n, a.SrcGet(), n.Aux)
}
if x.DstRdy(n) {
x.DstPut(n.Aux)
}
return nil
}
var reduceSteppedFire = func(n *Node) error {
n.Aux = reducer(n, a.SrcGet(), n.Aux)
x.DstPut(n.Aux)
return nil
}
var reduceRdy func(n *Node) bool
if freerun {
reduceRdy = reduceFreerunRdy
}
var reduceFire = reduceSteppedFire
if freerun {
reduceFire = reduceFreerunFire
}
node := MakeNode("reduce", []*Edge{&a}, []*Edge{&x}, reduceRdy, reduceFire)
node.Aux = make([]string, 0)
return node
}