Skip to content

Commit

Permalink
feat(rstream-graph): add stop(), fix const inputs, update docs/readme
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Jun 21, 2018
1 parent cc5b736 commit d0b1e5c
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 14 deletions.
93 changes: 89 additions & 4 deletions packages/rstream-graph/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ This project is part of the

Declarative, reactive dataflow graph construction using
[@thi.ng/rstream](/~https://github.com/thi-ng/umbrella/tree/master/packages/rstream),
[@thi.ng/atom](/~https://github.com/thi-ng/umbrella/tree/master/packages/atom) and [@thi.ng/transducers](/~https://github.com/thi-ng/umbrella/tree/master/packages/transducers)
[@thi.ng/atom](/~https://github.com/thi-ng/umbrella/tree/master/packages/atom)
and
[@thi.ng/transducers](/~https://github.com/thi-ng/umbrella/tree/master/packages/transducers)
primitives.

Stream subscription types act as graph nodes and attached transducers as
graph edges, transforming data for downstream consumers / nodes.
Theoretically, allows cycles and is not restricted to DAG topologies,
but care must be taken to avoid CPU hogging (user's responsibility).
but care must be taken to avoid CPU hogging if those cycles are causing
synchronous computation loops (it the user's responsibility to avoid
these).

## Installation

Expand Down Expand Up @@ -90,8 +94,89 @@ setTimeout(() => state.resetIn("a", 10), 1000);
// result: 360
```

Please see documentation in the source code & test cases for further
details.
## Graph specification

A dataflow graph spec is a plain object where keys are node names and
their values are `NodeSpec`s, defining a node's inputs, outputs and the
operation to be applied to produce one or more result streams.

```ts
interface NodeSpec {
fn: NodeFactory<any>;
ins: IObjectOf<NodeInputSpec>;
outs?: IObjectOf<NodeOutputSpec>;
}
```

Specification for a single "node" in the dataflow graph. Nodes here are
actually just wrappers of streams / subscriptions (or generally any form
of
[@thi.ng/rstream](/~https://github.com/thi-ng/umbrella/tree/master/packages/rstream)
`ISubscribable`), usually with an associated transducer to transform /
combine the inputs and produce values for the node's result stream.

The `fn` function is responsible to produce such a stream transformer
construct and the library provides several general purpose helpers for
that purpose. The keys used to specify inputs in the `ins` object are
dictated by the actual node `fn` used. Most node functions with multiple
inputs will be implemented as
[`StreamSync`](/~https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/stream-sync.ts)
instances and the input IDs are used to locally rename input streams
within the `StreamSync` container. Alo see `initGraph` and
`nodeFromSpec` (in
[`/src/nodes.ts`](/~https://github.com/thi-ng/umbrella/tree/master/packages/rstream-graph/src/nodes.ts)
for more details how these specs are compiled into stream constructs.

Specification for a single input, which can be given in different ways:

1) Create a stream of value changes at given path in state
[Atom](/~https://github.com/thi-ng/umbrella/e/master/packages/atom)
(passed to `initGraph`):

```ts
{ path: "nested.src.path" }
{ path: ["nested", "src", "path"] }
```

2) Reference path to another node's output in the GraphSpec object. See
[@thi.ng/resolve-map](/~https://github.com/thi-ng/umbrella/tree/master/packages/resolve-map)
for details.

```ts
{ stream: "/node-id/node" } // main node output
{ stream: "/node-id/outs/foo" } // specific output
```

3) Reference another node indirectly. The passed in `resolve` function
can be used to lookup other nodes, with the same logic as above. E.g.
the following spec looks up the main output of node "abc" and adds a
transformed subscription, which is then used as input for current
node.

```ts
{ stream: (resolve) =>
resolve("/abc/node").subscribe(map(x => x * 10)) }
```

4) Provide an external input stream:

```ts
{ stream: () => fromIterable([1,2,3], 500) }
```

5) Single value input stream:

```ts
{ const: 1 }
{ const: () => 1 }
```

If the optional `xform` key is given, a subscription with the given
transducer is added to the input and then used as input instead. This is
allows for further pre-processing of inputs.

Please see detailed documentation in the source code & test cases for
further details.

## Authors

Expand Down
17 changes: 8 additions & 9 deletions packages/rstream-graph/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ export interface Node {

/**
* A dataflow graph spec is simply an object where keys are node names
* and their values are either pre-existing @thi.ng/rstream
* `ISubscribable`s, functions returning `ISubscribable`s or
* `NodeSpec`s, defining a node's inputs, outputs and the operation to
* be applied to produce one or more result streams.
* and their values are `NodeSpec`s, defining a node's inputs, outputs
* and the operation to be applied to produce one or more result
* streams.
*/
export type GraphSpec = IObjectOf<
NodeSpec |
Expand All @@ -33,15 +32,15 @@ export type GraphSpec = IObjectOf<

/**
* Specification for a single "node" in the dataflow graph. Nodes here
* are actually streams / qsubscriptions (or just generally any form of
* @thi.ng/rstream `ISubscribable`), usually with an associated
* transducer to transform / combine the inputs and produce values for
* the node's result stream.
* are actually just wrappers of streams / subscriptions (or generally
* any form of thi.ng/rstream `ISubscribable`), usually with an
* associated transducer to transform / combine the inputs and produce
* values for the node's result stream.
*
* The `fn` function is responsible to produce such a stream transformer
* construct. The keys used to specify inputs in the `ins` object are
* dictated by the actual node `fn` used. Most node functions with
* multiple inputs are implemented as `StreamSync` instances and the
* multiple inputs will be implemented as `StreamSync` instances and the
* input IDs are used to locally rename input streams within the
* `StreamSync` container.
*
Expand Down
14 changes: 13 additions & 1 deletion packages/rstream-graph/src/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ const prepareNodeInputs = (ins: IObjectOf<NodeInputSpec>, state: IAtom<any>, res
s = isString(i.stream) ? resolve(i.stream) : i.stream(resolve);
}
else if (i.const) {
s = fromIterableSync([isFunction(i.const) ? i.const(resolve) : i.const]);
s = fromIterableSync([isFunction(i.const) ? i.const(resolve) : i.const], false);
}
else {
illegalArgs(`invalid node input: ${id}`);
Expand Down Expand Up @@ -189,6 +189,18 @@ export const removeNode = (graph: Graph, id: string) => {
return false;
};

/**
* Calls `.unsubscribe()` on all nodes in the graph, causing all related
* streams & subscriptions to terminate.
*
* @param graph
*/
export const stop = (graph: Graph) => {
for (let id in graph) {
graph[id].node.unsubscribe();
}
};

/**
* Higher order node / stream creator. Takes a transducer and (optional)
* required input stream IDs. The returned function takes an object of
Expand Down

0 comments on commit d0b1e5c

Please sign in to comment.