Timely dataflow is meant to be a fairly handy way to write dataflow computations, but did you know that it can be used to implement other communication and coordination patterns too?
It can! Not that you should do this, but it beats having to shell out to ZooKeeper to do something simple.
There are at least two examples I can think of: barrier synchronization and event serialization (putting things in an order, rather than turning into bytes). We are just going to do barriers today, because it is sunny out.
Let's take a simple example that comes up a lot in the evaluation of timely dataflow programs: you want to measure how long a computation takes, but to do this you need to get all workers to start their timers at the same time.
fn main() {
timely::execute_from_args(std::env::args(), move |worker| {
let timer = std::time::Instant::now();
// build some computation, ..
worker.dataflow(|scope| {
})
// Each worker loads some data, takes non-uniform time.
std::thread::sleep(Duration::from_secs(worker.index() as u64));
println!("{:?}\tworker {} loaded", timer.elapsed(), worker.index());
// run to completion
while worker.step() { }
println!("{:?}\tworker {} complete", timer.elapsed(), worker.index());
}).unwrap(); // asserts error-free execution;
}
What is the gap between the two printed lines? We would like to think that is the time it takes to do the computation, but it is a bit more subtle than that. It is actually the time from when a worker finishes loading its data, until the whole computation completes. That includes time spent waiting for other workers to load up their data, which can be a very different amount of time.
The above program (with an empty dataflow) does not actually cause a problem (timely dataflow is too clever, and realizes that without a dataflow each worker can finish independently). But if we make it a bit more non-trivial, just a touch of data, we see (with two threads):
Echidnatron% cargo run --example barrier-ex -- -w2
Compiling timely v0.6.0 (file:///Users/mcsherry/Projects/timely-master)
Finished dev [unoptimized + debuginfo] target(s) in 4.28s
Running `target/debug/examples/barrier-ex -w2`
6.068µs Worker 0 loaded
1.002544178s Worker 1 loaded
1.003165254s Worker 1 complete
1.003172394s Worker 0 complete
Echidnatron%
We can reason a bit and say that we should look for the smallest gap between loading and completing (here: Worker 1), but this feels like a bit of a cheat.
You might have noticed that we actually got some pretty sweet synchronization out of
// run to completion
while worker.step() { }
where we run the dataflow graph to completion. We've got the ability to run multiple dataflow graphs; why don't we just make a new dataflow that does nothing and run it to completion to synchronize the workers?
// synchronize all workers with a barrier.
let probe = worker.dataflow::<(),_,_>(|scope| {
let (_, input) = scope.new_input::<()>();
input.probe()
});
while !probe.done() { worker.step(); }
This fragment has each worker build a new dataflow graph, with really not much in it. The timestamp type is ()
, and there is one input with a datatype of ()
. We never introduce any data, and just probe()
the output to see when we can be sure that no other worker will produce any data. This happens only once each of the other workers have created and dropped their input handle (the _
above).
This actually works. If you drop this hunk of code in to a timely dataflow worker, the workers will chill out here until all other workers have also reached this point. Once they've all reached the point, all workers release and are somewhat synchronized (at least, they are all done with whatever they were doing beforehand).
The above fragment is a bit wordy, so we could actually go and package it up into a timely dataflow method for you.
I did one better and put together a re-usable barrier (coming soon to a PR near you), in case you want to syncronize the workers multiple times (and don't want to create new dataflows each time). It's almost the same as the example above, except that the timestamp type is usize
so that you can tick upwards each time you re-use the barrier.
//! Barrier synchronization.
use ::Allocate;
use progress::timestamp::RootTimestamp;
use progress::nested::product::Product;
use dataflow::{InputHandle, ProbeHandle};
use dataflow::scopes::root::Root;
/// A re-usable barrier synchronization mechanism.
pub struct Barrier<A: Allocate> {
round: usize,
input: InputHandle<usize, ()>,
probe: ProbeHandle<Product<RootTimestamp, usize>>,
worker: Root<A>,
}
impl<A: Allocate> Barrier<A> {
/// Allocates a new barrier.
pub fn new(worker: &mut Root<A>) -> Self {
use dataflow::operators::{Input, Probe};
let (input, probe) = worker.dataflow(|scope| {
let (handle, stream) = scope.new_input::<()>();
(handle, stream.probe())
});
Barrier { round: 0, input, probe, worker: worker.clone() }
}
/// Blocks until all other workers have reached this barrier.
///
/// This method does *not* block dataflow execution, which continues to execute while
/// we await the arrival of the other workers.
pub fn wait(&mut self) {
self.round += 1;
self.input.advance_to(self.round);
while self.probe.less_than(self.input.time()) {
self.worker.step();
}
}
}
As indicated in the comment, the wait()
method has the potentially cool property that it does not block dataflow execution. We keep executing the dataflow (self.worker.step()
) waiting for the synchronization signal, which means that we are also actively running all other live dataflows. Compare this with a more traditional distributed barrier, which might cause each worker to lock-up until all other workers are ready, doing nothing at all while they wait.
Let's look at a program which uses this new Barrier
type:
extern crate timely;
use std::time::{Instant, Duration};
// Note: in a branch; can't have yet!
use timely::synchronization::Barrier;
fn main() {
timely::execute_from_args(std::env::args(), move |worker| {
let timer = Instant::now();
let mut barrier = Barrier::new(worker);
loop {
// screw up synchronization, then re-synchronize.
std::thread::sleep(Duration::from_secs(worker.index() as u64));
let elapsed1 = timer.elapsed();
barrier.wait();
let elapsed2 = timer.elapsed();
println!("Worker {}: {:?}\t{:?}", worker.index(), elapsed1, elapsed2);
}
}).unwrap(); // asserts error-free execution;
}
This program starts multiple workers, and repeatedly screws up the synchronization between them and then resynchronizes. It prints out the de-synchronized times and the synchronized times. Ideally the former should be all different, and the latter should be pretty much the same.
Echidnatron% cargo run --example barrier-ex -- -w2
Compiling timely v0.6.0 (file:///Users/mcsherry/Projects/timely-master)
Finished dev [unoptimized + debuginfo] target(s) in 4.14s
Running `target/debug/examples/barrier-ex -w2`
Worker 0: 624.773µs 1.003743907s
Worker 1: 1.003576543s 1.003751088s
Worker 1: 2.005013854s 2.005141737s
Worker 0: 1.003813969s 2.005136371s
Worker 0: 2.005176509s 3.010288135s
Worker 1: 3.010185988s 3.010295034s
Worker 1: 4.015362201s 4.015469842s
Worker 0: 3.010313215s 4.015464516s
^C
Echidnatron%
Yeah, pretty good.
Let's imagine you have a bunch of somewhat independent workers, each with a list of things to do, and we need to put these things into some order.
This problem actually shows up in timely dataflow computations, whenever you want to build a timely dataflow server that can dynamically spin up new dataflow graphs in response to user input. One example is the differential dataflow graph server and another is Nikolas Göbel's declarative dataflow server.
Each of these systems accept user queries and translate them into dataflows, but it is crucially important that each worker builds them in the same order (maybe that is a bug, but it is what it is for now).
So, let's build a handy synchronization mechanism that uses timely dataflow to sequence elements proposed by workers so that all workers see the same list in the same order (called atomic broadcast in the fault-tolerant computing space, which is not where we are).
Let's start with the simple parts of our implementation. A Sequencer
will be a pair of shared lists, where it inserts into one of the lists (send
) and draws elements off of the other list (recv
).
/// Orders elements inserted across all workers.
///
/// A Sequencer allows each worker to insert into a consistent ordered sequence
/// that is seen by all workers in the same order.
pub struct Sequencer<T> {
send: Rc<RefCell<VecDeque<T>>>, // proposed items.
recv: Rc<RefCell<VecDeque<T>>>, // sequenced items.
}
impl<T: Ord+ExchangeData> Sequencer<T> {
/// Creates a new Sequencer.
///
/// The `timer` instant is used to synchronize the workers, who use this
/// elapsed time as their timestamp. Elements are ordered by this time,
/// and cannot be made visible until all workers have reached the time.
pub fn new<A: Allocate>(worker: &mut Root<A>, timer: Instant) -> Self {
unimplemented!()
}
/// Adds an element to the shared log.
pub fn push(&mut self, element: T) {
self.send.borrow_mut().push_back(element);
}
/// Reads the next element from the shared log.
pub fn next(&mut self) -> Option<T> {
self.recv.borrow_mut().pop_front()
}
}
These lists are going to need to be shared with something, and we haven't seen that yet. We've also left the method that constructs a new Sequencer
unimplemented, and you might expect that this is where most of the magic lives (indeed, it is the only place with a timely dataflow hook, the worker: &mut Root<A>
handle).
But, informally, all that a Sequencer
does here is push onto a shared list and read elements off of a (probably different) shared list.
Let's look at the implementation of new()
.
Informally, the new()
method will create a new dataflow with two operators, a source and a sink. The timestamp type will be "elapsed nanoseconds", and the intent is that each worker's source produces elements enqueued by its Sequencer
at the local time, and each worker's sink pulls out records and orders them by the time at which they were enqueued. By using timely's progress tracking, each sink can be sure it has seen all records with certain timestamps, and produces output only for those times that all workers have marked as completed.
pub fn new<A: Allocate>(worker: &mut Root<A>, timer: Instant) -> Self {
// Create shared input and output queues.
let send = Rc::new(RefCell::new(VecDeque::new()));
let recv = Rc::new(RefCell::new(VecDeque::new()));
let send_weak = Rc::downgrade(&send);
let recv_weak = Rc::downgrade(&recv);
// build a dataflow to order elements.
worker.dataflow(move |dataflow| {
let peers = dataflow.peers(); // used in source.
let mut recvd = Vec::new(); // used in sink.
source(
dataflow,
"SequenceInput",
move |capability| {
// pull from `send_weak`, send to all.
unimplemented!()
}
)
.sink(
Exchange::new(|x: &(usize, T)| x.0 as u64),
"SequenceOutput",
move |input| {
// recv from all, push into `recv_weak`.
unimplemented!()
}
);
});
Sequencer { send, recv, }
}
There are still some unimplemented!()
lines here, but the rough structure should start to be clear. Let's look at each of the parts!
The timely dataflow source
operator is a dataflow operator with one output and zero inputs. It's like an input, but programmable.
If you check out it's signature, once the nausea passes you will see that source
mainly needs a function from a "capability" to a function we can repeatedly call on an output handle (something we can send data at).
A capability is what gives an operator its ability to send data. As long as an operator holds on to a capability for a specific time, the operator will be able to send data with that timestamp. But with great power comes great responsibility! As long as the operator holds the capability, downstream operators will not be able to reason about the completion of times beyond that of the capability. Our operator must carefully manage a capability, downgrading and perhaps eventually dropping it, to allow downstream operators to make progress.
Here is what we do:
// a source that attempts to pull from `recv` and produce commands for everyone
source(dataflow, "SequenceInput", move |capability| {
// so we can drop, if `send` vanishes.
let mut capability = Some(capability);
move |output| {
// if `send` has not yet been dropped, ...
if let Some(send_queue) = send_weak.upgrade() {
// capability *should* still be non-None.
let capability = capability.as_mut().expect("Capability unavailable");
// downgrade capability to current time.
let mut time = capability.time().clone();
let elapsed = timer.elapsed();
let elapsed_ns = (elapsed.as_secs() * 1000000000 + elapsed.subsec_nanos() as u64) as usize;
time.inner = elapsed_ns;
capability.downgrade(&time);
// drain and broadcast `send`.
let mut session = output.session(&capability);
let mut borrow = send_queue.borrow_mut();
for element in borrow.drain(..) {
for worker_index in 0 .. peers {
session.give((worker_index, element.clone()));
}
}
}
else {
// drop capability if `send` has been dropped.
capability = None;
}
}
})
The comments may make things clear, but there are a few interesting moments. We re-wrap the capability in an Option<_>
type, so that we can drop it (by setting it to None
) should we determine that the operator can produce no more output; this happens when the input send
weak reference cannot be upgraded (the strong reference has been dropped). If send
is still live, we downgrade the capability to the current elapsed time according to timer
; this allows the system-wide time to move forward, and allows downstream operators (the sink
) to conclude that some times are complete. With the downgraded capability, we broadcast each element to all workers, timestamped with the current time.
The very next operator is a sink
operator, which has one input and zero outputs.
The sink operator is a bit simpler than a source, because with no outputs it has no capabilities to manage. It is specified mainly by i. routing instructions for incoming data, and ii. a function that is repeatedly executed on an input handle. We want to route data by the worker_index
we stashed in the first field; let's see what sort of function we might repeatedly execute:
.sink(
Exchange::new(|x: &(usize, T)| x.0 as u64),
"SequenceOutput",
move |input| {
// record each element, with timestamp.
input.for_each(|time, data| {
for (_worker, element) in data.drain(..) {
recvd.push((time.time().clone(), element));
}
});
// order by time, breaking ties by element.
recvd.sort();
// determine how many (which) elements to drain from `recvd`.
let count = recvd.iter().filter(|&(ref time, _)| !input.frontier().less_equal(time)).count();
let valid = recvd.drain(..count).map(|(_time, elem)| elem);
// if `recv` un-dropped, move `iter` into `recv`.
if let Some(recv_queue) = recv_weak.upgrade() {
recv_queue.borrow_mut().extend(valid);
}
}
);
Let's break this down. We pull all of the data output of the input and stage it in recvd
, and then sort this by timestamp (breaking ties by the element itself). We then determine how many staged records are ready to go: those with timestamps that are not greater than or equal to timestamps that might still emerge (the input.frontier()
). All of these records are extracted, and then if recv
is still around moved to its end.
The correctness here results from the fact that we only publish records that will not be re-ordered, because timely guarantees we will not see such a small timestamp again.
How would you use this? The ideal use cases are part of more complicated machinery, but let's do a small example to give the look and feel.
This example has workers repeatedly sleep for variable amounts of time, and produce announcements (strings) that go to the sequencer.
extern crate timely;
use std::time::{Instant, Duration};
use timely::synchronization::Sequencer;
fn main() {
timely::execute_from_args(std::env::args(), move |worker| {
let timer = Instant::now();
let mut sequencer = Sequencer::new(worker, Instant::now());
for round in 0 .. {
std::thread::sleep(Duration::from_secs(1 + worker.index() as u64));
sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
while let Some(element) = sequencer.next() {
println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
}
worker.step();
}
}).unwrap(); // asserts error-free execution;
}
Although the announcements are inserted at varying times, and although there should be some races (e.g. around seconds 2, 4, etc), we should see the same order of events for everyone.
Echidnatron% cargo run --example barrier-ex -- -w2
Compiling timely v0.6.0 (file:///Users/mcsherry/Projects/timely-master)
Finished dev [unoptimized + debuginfo] target(s) in 6.10s
Running `target/debug/examples/barrier-ex -w2`
5.019666549s: Worker 0: recv'd: "worker 0, round 0"
6.010601428s: Worker 1: recv'd: "worker 0, round 0"
7.026634193s: Worker 0: recv'd: "worker 1, round 0"
7.026702148s: Worker 0: recv'd: "worker 0, round 1"
7.026729358s: Worker 0: recv'd: "worker 0, round 2"
8.012440758s: Worker 1: recv'd: "worker 1, round 0"
8.012510943s: Worker 1: recv'd: "worker 0, round 1"
8.012537213s: Worker 1: recv'd: "worker 0, round 2"
9.036814332s: Worker 0: recv'd: "worker 1, round 1"
9.036883187s: Worker 0: recv'd: "worker 0, round 3"
9.036908927s: Worker 0: recv'd: "worker 0, round 4"
10.018041717s: Worker 1: recv'd: "worker 1, round 1"
10.018110227s: Worker 1: recv'd: "worker 0, round 3"
10.018137672s: Worker 1: recv'd: "worker 0, round 4"
11.047954807s: Worker 0: recv'd: "worker 1, round 2"
11.048023427s: Worker 0: recv'd: "worker 0, round 5"
11.048050622s: Worker 0: recv'd: "worker 0, round 6"
12.023656297s: Worker 1: recv'd: "worker 1, round 2"
12.023724582s: Worker 1: recv'd: "worker 0, round 5"
12.023750947s: Worker 1: recv'd: "worker 0, round 6"
13.054815266s: Worker 0: recv'd: "worker 1, round 3"
13.054883501s: Worker 0: recv'd: "worker 0, round 7"
13.054910841s: Worker 0: recv'd: "worker 0, round 8"
14.027085346s: Worker 1: recv'd: "worker 1, round 3"
14.027272343s: Worker 1: recv'd: "worker 0, round 7"
14.027289034s: Worker 1: recv'd: "worker 0, round 8"
^C
Echidnatron%
This is easier to confirm as correct if we group these output by the observing worker:
5.019666549s: Worker 0: recv'd: "worker 0, round 0"
7.026634193s: Worker 0: recv'd: "worker 1, round 0"
7.026702148s: Worker 0: recv'd: "worker 0, round 1"
7.026729358s: Worker 0: recv'd: "worker 0, round 2"
9.036814332s: Worker 0: recv'd: "worker 1, round 1"
9.036883187s: Worker 0: recv'd: "worker 0, round 3"
9.036908927s: Worker 0: recv'd: "worker 0, round 4"
11.047954807s: Worker 0: recv'd: "worker 1, round 2"
11.048023427s: Worker 0: recv'd: "worker 0, round 5"
11.048050622s: Worker 0: recv'd: "worker 0, round 6"
13.054815266s: Worker 0: recv'd: "worker 1, round 3"
13.054883501s: Worker 0: recv'd: "worker 0, round 7"
13.054910841s: Worker 0: recv'd: "worker 0, round 8"
and
6.010601428s: Worker 1: recv'd: "worker 0, round 0"
8.012440758s: Worker 1: recv'd: "worker 1, round 0"
8.012510943s: Worker 1: recv'd: "worker 0, round 1"
8.012537213s: Worker 1: recv'd: "worker 0, round 2"
10.018041717s: Worker 1: recv'd: "worker 1, round 1"
10.018110227s: Worker 1: recv'd: "worker 0, round 3"
10.018137672s: Worker 1: recv'd: "worker 0, round 4"
12.023656297s: Worker 1: recv'd: "worker 1, round 2"
12.023724582s: Worker 1: recv'd: "worker 0, round 5"
12.023750947s: Worker 1: recv'd: "worker 0, round 6"
14.027085346s: Worker 1: recv'd: "worker 1, round 3"
14.027272343s: Worker 1: recv'd: "worker 0, round 7"
14.027289034s: Worker 1: recv'd: "worker 0, round 8"
Looks pretty similar, even though the times at which these records get pulled out vary a fair bit.
Nice job us!