Skip to content

Commit

Permalink
guarded logging
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Lattuada <andrea.lattuada@inf.ethz.ch>
  • Loading branch information
utaal committed Dec 20, 2017
1 parent b57a360 commit 845940a
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 61 deletions.
18 changes: 8 additions & 10 deletions communication/src/allocator/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,19 @@ impl<T> Pusher<T> {
impl<T:Data> Push<T> for Pusher<T> {
#[inline] fn push(&mut self, element: &mut Option<T>) {
if let Some(ref mut element) = *element {
self.log_sender.log(
::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent {
self.log_sender.when_enabled(|l| l.log(::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent {
seq_no: Some(self.header.seqno),
is_start: true,
}));
})));
let mut bytes = Vec::new();
<T as Serialize>::into_bytes(element, &mut bytes);
let mut header = self.header;
header.length = bytes.len();
self.sender.send((header, bytes)).ok(); // TODO : should be unwrap()?
self.log_sender.log(
::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent {
self.log_sender.when_enabled(|l| l.log(::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent {
seq_no: Some(self.header.seqno),
is_start: true,
}));
})));
self.header.seqno += 1;
}
}
Expand All @@ -142,17 +140,17 @@ impl<T:Data> Pull<T> for Puller<T> {
if inner.is_some() { inner }
else {
self.current = self.receiver.try_recv().ok().map(|mut bytes| {
log_sender.log(
log_sender.when_enabled(|l| l.log(
::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent {
seq_no: None,
is_start: true,
}));
})));
let result = <T as Serialize>::from_bytes(&mut bytes);
log_sender.log(
log_sender.when_enabled(|l| l.log(
::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent {
seq_no: None,
is_start: false,
}));
})));
result
});
&mut self.current
Expand Down
10 changes: 4 additions & 6 deletions communication/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,13 @@ impl<R: Read> BinaryReceiver<R> {
let remaining = {
let mut slice = &self.buffer[..self.length];
while let Some(header) = MessageHeader::try_read(&mut slice) {
self.log_sender.log(
::timely_logging::CommsEvent::Communication(::logging::CommunicationEvent {
self.log_sender.when_enabled(|l| l.log(::timely_logging::CommsEvent::Communication(::logging::CommunicationEvent {
is_send: false,
comm_channel: header.channel,
source: header.source,
target: header.target,
seqno: header.seqno,
}));
})));
let h_len = header.length as usize; // length in bytes
let target = &mut self.targets.ensure(header.target, header.channel);
target.send(slice[..h_len].to_vec()).unwrap();
Expand Down Expand Up @@ -178,14 +177,13 @@ impl<W: Write> BinarySender<W> {

for (header, mut buffer) in stash.drain_temp() {
assert!(header.length == buffer.len());
self.log_sender.log(
::timely_logging::CommsEvent::Communication(::logging::CommunicationEvent {
self.log_sender.when_enabled(|l| l.log(::timely_logging::CommsEvent::Communication(::logging::CommunicationEvent {
is_send: true,
comm_channel: header.channel,
source: header.source,
target: header.target,
seqno: header.seqno,
}));
})));
header.write_to(&mut self.writer).unwrap();
self.writer.write_all(&buffer[..]).unwrap();
buffer.clear();
Expand Down
30 changes: 19 additions & 11 deletions logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,27 @@ pub enum LoggerBatch<S: Clone, L: Clone> {
End,
}

struct ActiveBufferingLogger<S: Clone, L: Clone> {
pub struct ActiveBufferingLogger<S: Clone, L: Clone> {
setup: S,
buffer: Vec<(u64, S, L)>,
pusher: Box<Fn(LoggerBatch<S, L>)->()>,
}

impl<S: Clone, L: Clone> ActiveBufferingLogger<S, L> {
pub fn log(&mut self, l: L) {
let ts = get_precise_time_ns();
self.buffer.push((ts, self.setup.clone(), l));
if self.buffer.len() >= BUFFERING_LOGGER_CAPACITY {
self.flush();
}
}

fn flush(&mut self) {
let mut buf = ::std::mem::replace(&mut self.buffer, Vec::new());
(self.pusher)(LoggerBatch::Logs(buf));
}
}

pub struct BufferingLogger<S: Clone, L: Clone> {
target: Option<RefCell<ActiveBufferingLogger<S, L>>>,
}
Expand All @@ -305,22 +320,15 @@ impl<S: Clone, L: Clone> BufferingLogger<S, L> {
})
}

pub fn log(&self, l: L) {
pub fn when_enabled<F: FnOnce(&mut ActiveBufferingLogger<S, L>)->()>(&self, f: F) {
if let Some(ref logger) = self.target {
let ActiveBufferingLogger { ref setup, ref mut buffer, .. } = *logger.borrow_mut();
let ts = get_precise_time_ns();
buffer.push((ts, setup.clone(), l));
if buffer.len() >= BUFFERING_LOGGER_CAPACITY {
self.flush();
}
f(&mut *logger.borrow_mut())
}
}

fn flush(&self) {
if let Some(ref logger) = self.target {
let ActiveBufferingLogger { ref mut buffer, ref pusher, .. } = *logger.borrow_mut();
let mut buf = ::std::mem::replace(buffer, Vec::new());
(pusher)(LoggerBatch::Logs(buf));
logger.borrow_mut().flush();
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,15 @@ impl<T, D, P: Push<Message<T, D>>> Push<(T, Content<D>)> for Pusher<T, D, P> {
self.pusher.push(&mut message);
*pair = message.map(|x| (x.time, x.data));

self.logging.log(::timely_logging::Event::Messages(::timely_logging::MessagesEvent {
self.logging.when_enabled(|l| l.log(::timely_logging::Event::Messages(::timely_logging::MessagesEvent {
is_send: true,
channel: self.channel,
comm_channel: self.comm_channel,
source: self.source,
target: self.target,
seq_no: counter,
length: length,
}));
})));

// Log something about (index, counter, time?, length?);
}
Expand Down Expand Up @@ -186,15 +186,15 @@ impl<T, D, P: Pull<Message<T, D>>> Pull<(T, Content<D>)> for Puller<T, D, P> {

if let Some(message) = previous.as_ref() {

self.logging.log(::timely_logging::Event::Messages(::timely_logging::MessagesEvent {
self.logging.when_enabled(|l| l.log(::timely_logging::Event::Messages(::timely_logging::MessagesEvent {
is_send: false,
channel: self.channel,
comm_channel: self.comm_channel,
source: message.from,
target: self.index,
seq_no: message.seq,
length: message.data.len(),
}));
})));
}

self.current = previous.map(|message| (message.time, message.data));
Expand Down
8 changes: 4 additions & 4 deletions src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ impl<'a, T: Timestamp, D, P: Pull<(T, Content<D>)>> InputHandle<T, D, P> {
pub fn for_each<F: FnMut(Capability<T>, &mut Content<D>)>(&mut self, mut logic: F) {
let logging = self.logging.clone();
while let Some((cap, data)) = self.next() {
logging.log(::timely_logging::Event::GuardedMessage(
::timely_logging::GuardedMessageEvent { is_start: true }));
logging.when_enabled(|l| l.log(::timely_logging::Event::GuardedMessage(
::timely_logging::GuardedMessageEvent { is_start: true })));
logic(cap, data);
logging.log(::timely_logging::Event::GuardedMessage(
::timely_logging::GuardedMessageEvent { is_start: false }));
logging.when_enabled(|l| l.log(::timely_logging::Event::GuardedMessage(
::timely_logging::GuardedMessageEvent { is_start: false })));
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/dataflow/operators/generic/notificator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ impl<'a, T: Timestamp> Notificator<'a, T> {
#[inline]
pub fn for_each<F: FnMut(Capability<T>, u64, &mut Notificator<T>)>(&mut self, mut logic: F) {
while let Some((cap, count)) = self.next() {
self.logging.log(::timely_logging::Event::GuardedProgress(
::timely_logging::GuardedProgressEvent { is_start: true }));
self.logging.when_enabled(|l| l.log(::timely_logging::Event::GuardedProgress(
::timely_logging::GuardedProgressEvent { is_start: true })));
logic(cap, count, self);
self.logging.log(::timely_logging::Event::GuardedProgress(
::timely_logging::GuardedProgressEvent { is_start: false }));
self.logging.when_enabled(|l| l.log(::timely_logging::Event::GuardedProgress(
::timely_logging::GuardedProgressEvent { is_start: false })));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/dataflow/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ impl<S: Scope, D> Stream<S, D> {
pub fn connect_to<P: Push<(S::Timestamp, Content<D>)>+'static>(&self, target: Target, pusher: P, identifier: usize) {

let logging = self.scope().logging();
logging.log(::timely_logging::Event::Channels(::timely_logging::ChannelsEvent {
logging.when_enabled(|l| l.log(::timely_logging::Event::Channels(::timely_logging::ChannelsEvent {
id: identifier,
scope_addr: self.scope.addr(),
source: (self.name.index, self.name.port),
target: (target.index, target.port),
}));
})));

self.scope.add_edge(self.name, target);
self.ports.add_pusher(pusher);
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
//! We then introduce input at increasing rounds, indicate the advance to the system (promising
//! that we will introduce no more input at prior rounds), and step the computation.
#![forbid(missing_docs)]
// #![forbid(missing_docs)]

extern crate abomonation;
extern crate timely_communication;
Expand Down
19 changes: 11 additions & 8 deletions src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ impl<T:Timestamp+Send> Progcaster<T> {
/// Creates a new `Progcaster` using a channel from the supplied allocator.
pub fn new<A: Allocate>(allocator: &mut A, path: &Vec<usize>, logging: Logger) -> Progcaster<T> {
let (pushers, puller, chan) = allocator.allocate();
logging.log(::timely_logging::Event::CommChannels(::timely_logging::CommChannelsEvent {
logging.when_enabled(|l| l.log(::timely_logging::Event::CommChannels(::timely_logging::CommChannelsEvent {
comm_channel: chan,
comm_channel_kind: ::timely_logging::CommChannelKind::Progress,
}));
})));
let worker = allocator.index();
let addr = path.clone();
Progcaster { pushers: pushers, puller: puller, source: worker,
Expand All @@ -54,7 +54,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
{
if self.pushers.len() > 1 { // if the length is one, just return the updates...
if !messages.is_empty() || !internal.is_empty() {
self.logging.log(::timely_logging::Event::Progress(::timely_logging::ProgressEvent {
self.logging.when_enabled(|l| l.log(::timely_logging::Event::Progress(::timely_logging::ProgressEvent {
is_send: true,
source: self.source,
comm_channel: self.comm_channel,
Expand All @@ -63,7 +63,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
// TODO: fill with additional data
messages: Vec::new(),
internal: Vec::new(),
}));
})));

for pusher in self.pushers.iter_mut() {
pusher.push(&mut Some((self.source, self.counter, messages.clone().into_inner(), internal.clone().into_inner())));
Expand All @@ -79,16 +79,19 @@ impl<T:Timestamp+Send> Progcaster<T> {
// TODO : Could take ownership, and recycle / reuse for next broadcast ...
while let Some((ref source, ref counter, ref mut recv_messages, ref mut recv_internal)) = *self.puller.pull() {

self.logging.log(::timely_logging::Event::Progress(::timely_logging::ProgressEvent {
let comm_channel = self.comm_channel;
// TODO(andreal) can we do something about this unnecessary clone when logging is disabled?
let addr = self.addr.clone();
self.logging.when_enabled(|l| l.log(::timely_logging::Event::Progress(::timely_logging::ProgressEvent {
is_send: false,
source: *source,
seq_no: *counter,
comm_channel: self.comm_channel,
addr: self.addr.clone(),
comm_channel,
addr,
// TODO: fill with additional data
messages: Vec::new(),
internal: Vec::new(),
}));
})));

for (update, delta) in recv_messages.drain(..) {
messages.update(update, delta);
Expand Down
25 changes: 14 additions & 11 deletions src/progress/nested/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,11 +544,11 @@ impl<TOuter: Timestamp, TInner: Timestamp> SubgraphBuilder<TOuter, TInner> {
{
let mut child_path = self.path.clone();
child_path.push(index);
self.logging.log(::timely_logging::Event::Operates(::timely_logging::OperatesEvent {
self.logging.when_enabled(|l| l.log(::timely_logging::Event::Operates(::timely_logging::OperatesEvent {
id: identifier,
addr: child_path,
name: child.name().to_owned(),
}));
})));
}
self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone()))
}
Expand Down Expand Up @@ -924,11 +924,14 @@ impl<T: Timestamp> PerOperatorState<T> {

{
let changes = &mut self.external_buffer;
if changes.iter_mut().any(|ref mut c| !c.is_empty()) {
self.logging.log(::timely_logging::Event::PushProgress(::timely_logging::PushProgressEvent {
op_id: self.id,
}));
}
let id = self.id;
self.logging.when_enabled(|l| {
if changes.iter_mut().any(|ref mut c| !c.is_empty()) {
l.log(::timely_logging::Event::PushProgress(::timely_logging::PushProgressEvent {
op_id: id,
}));
}
});
self.operator.as_mut().map(|x| x.push_external_progress(changes));
if changes.iter_mut().any(|x| !x.is_empty()) {
println!("changes not consumed by {:?}", self.name);
Expand All @@ -942,9 +945,9 @@ impl<T: Timestamp> PerOperatorState<T> {

let active = {

self.logging.log(::timely_logging::Event::Schedule(::timely_logging::ScheduleEvent {
self.logging.when_enabled(|l| l.log(::timely_logging::Event::Schedule(::timely_logging::ScheduleEvent {
id: self.id, start_stop: ::timely_logging::StartStop::Start
}));
})));

debug_assert!(self.consumed_buffer.iter_mut().all(|cm| cm.is_empty()));
debug_assert!(self.internal_buffer.iter_mut().all(|cm| cm.is_empty()));
Expand All @@ -965,9 +968,9 @@ impl<T: Timestamp> PerOperatorState<T> {
self.internal_buffer.iter_mut().any(|cm| !cm.is_empty()) ||
self.produced_buffer.iter_mut().any(|cm| !cm.is_empty());

self.logging.log(::timely_logging::Event::Schedule(::timely_logging::ScheduleEvent {
self.logging.when_enabled(|l| l.log(::timely_logging::Event::Schedule(::timely_logging::ScheduleEvent {
id: self.id, start_stop: ::timely_logging::StartStop::Stop { activity: did_work }
}));
})));

result
};
Expand Down

0 comments on commit 845940a

Please sign in to comment.