diff --git a/communication/src/allocator/binary.rs b/communication/src/allocator/binary.rs index 38c15a575..999c95c3e 100644 --- a/communication/src/allocator/binary.rs +++ b/communication/src/allocator/binary.rs @@ -102,21 +102,19 @@ impl Pusher { impl Push for Pusher { #[inline] fn push(&mut self, element: &mut Option) { if let Some(ref mut element) = *element { - self.log_sender.log( - ::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent { + self.log_sender.when_enabled(|l| l.log(::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent { seq_no: Some(self.header.seqno), is_start: true, - })); + }))); let mut bytes = Vec::new(); ::into_bytes(element, &mut bytes); let mut header = self.header; header.length = bytes.len(); self.sender.send((header, bytes)).ok(); // TODO : should be unwrap()? - self.log_sender.log( - ::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent { + self.log_sender.when_enabled(|l| l.log(::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent { seq_no: Some(self.header.seqno), is_start: true, - })); + }))); self.header.seqno += 1; } } @@ -142,17 +140,17 @@ impl Pull for Puller { if inner.is_some() { inner } else { self.current = self.receiver.try_recv().ok().map(|mut bytes| { - log_sender.log( + log_sender.when_enabled(|l| l.log( ::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent { seq_no: None, is_start: true, - })); + }))); let result = ::from_bytes(&mut bytes); - log_sender.log( + log_sender.when_enabled(|l| l.log( ::timely_logging::CommsEvent::Serialization(::timely_logging::SerializationEvent { seq_no: None, is_start: false, - })); + }))); result }); &mut self.current diff --git a/communication/src/networking.rs b/communication/src/networking.rs index b586ad48a..c67e576f2 100644 --- a/communication/src/networking.rs +++ b/communication/src/networking.rs @@ -116,14 +116,13 @@ impl BinaryReceiver { let remaining = { let mut slice = &self.buffer[..self.length]; while let Some(header) = MessageHeader::try_read(&mut slice) { - self.log_sender.log( - ::timely_logging::CommsEvent::Communication(::logging::CommunicationEvent { + self.log_sender.when_enabled(|l| l.log(::timely_logging::CommsEvent::Communication(::logging::CommunicationEvent { is_send: false, comm_channel: header.channel, source: header.source, target: header.target, seqno: header.seqno, - })); + }))); let h_len = header.length as usize; // length in bytes let target = &mut self.targets.ensure(header.target, header.channel); target.send(slice[..h_len].to_vec()).unwrap(); @@ -178,14 +177,13 @@ impl BinarySender { for (header, mut buffer) in stash.drain_temp() { assert!(header.length == buffer.len()); - self.log_sender.log( - ::timely_logging::CommsEvent::Communication(::logging::CommunicationEvent { + self.log_sender.when_enabled(|l| l.log(::timely_logging::CommsEvent::Communication(::logging::CommunicationEvent { is_send: true, comm_channel: header.channel, source: header.source, target: header.target, seqno: header.seqno, - })); + }))); header.write_to(&mut self.writer).unwrap(); self.writer.write_all(&buffer[..]).unwrap(); buffer.clear(); diff --git a/logging/src/lib.rs b/logging/src/lib.rs index e459abdcb..7335f77a3 100644 --- a/logging/src/lib.rs +++ b/logging/src/lib.rs @@ -278,12 +278,27 @@ pub enum LoggerBatch { End, } -struct ActiveBufferingLogger { +pub struct ActiveBufferingLogger { setup: S, buffer: Vec<(u64, S, L)>, pusher: Box)->()>, } +impl ActiveBufferingLogger { + pub fn log(&mut self, l: L) { + let ts = get_precise_time_ns(); + self.buffer.push((ts, self.setup.clone(), l)); + if self.buffer.len() >= BUFFERING_LOGGER_CAPACITY { + self.flush(); + } + } + + fn flush(&mut self) { + let mut buf = ::std::mem::replace(&mut self.buffer, Vec::new()); + (self.pusher)(LoggerBatch::Logs(buf)); + } +} + pub struct BufferingLogger { target: Option>>, } @@ -305,22 +320,15 @@ impl BufferingLogger { }) } - pub fn log(&self, l: L) { + pub fn when_enabled)->()>(&self, f: F) { if let Some(ref logger) = self.target { - let ActiveBufferingLogger { ref setup, ref mut buffer, .. } = *logger.borrow_mut(); - let ts = get_precise_time_ns(); - buffer.push((ts, setup.clone(), l)); - if buffer.len() >= BUFFERING_LOGGER_CAPACITY { - self.flush(); - } + f(&mut *logger.borrow_mut()) } } fn flush(&self) { if let Some(ref logger) = self.target { - let ActiveBufferingLogger { ref mut buffer, ref pusher, .. } = *logger.borrow_mut(); - let mut buf = ::std::mem::replace(buffer, Vec::new()); - (pusher)(LoggerBatch::Logs(buf)); + logger.borrow_mut().flush(); } } } diff --git a/src/dataflow/channels/pact.rs b/src/dataflow/channels/pact.rs index 84054c73d..a72f1d299 100644 --- a/src/dataflow/channels/pact.rs +++ b/src/dataflow/channels/pact.rs @@ -135,7 +135,7 @@ impl>> Push<(T, Content)> for Pusher { self.pusher.push(&mut message); *pair = message.map(|x| (x.time, x.data)); - self.logging.log(::timely_logging::Event::Messages(::timely_logging::MessagesEvent { + self.logging.when_enabled(|l| l.log(::timely_logging::Event::Messages(::timely_logging::MessagesEvent { is_send: true, channel: self.channel, comm_channel: self.comm_channel, @@ -143,7 +143,7 @@ impl>> Push<(T, Content)> for Pusher { target: self.target, seq_no: counter, length: length, - })); + }))); // Log something about (index, counter, time?, length?); } @@ -186,7 +186,7 @@ impl>> Pull<(T, Content)> for Puller { if let Some(message) = previous.as_ref() { - self.logging.log(::timely_logging::Event::Messages(::timely_logging::MessagesEvent { + self.logging.when_enabled(|l| l.log(::timely_logging::Event::Messages(::timely_logging::MessagesEvent { is_send: false, channel: self.channel, comm_channel: self.comm_channel, @@ -194,7 +194,7 @@ impl>> Pull<(T, Content)> for Puller { target: self.index, seq_no: message.seq, length: message.data.len(), - })); + }))); } self.current = previous.map(|message| (message.time, message.data)); diff --git a/src/dataflow/operators/generic/handles.rs b/src/dataflow/operators/generic/handles.rs index 6d2d20f28..73960e7c3 100644 --- a/src/dataflow/operators/generic/handles.rs +++ b/src/dataflow/operators/generic/handles.rs @@ -68,11 +68,11 @@ impl<'a, T: Timestamp, D, P: Pull<(T, Content)>> InputHandle { pub fn for_each, &mut Content)>(&mut self, mut logic: F) { let logging = self.logging.clone(); while let Some((cap, data)) = self.next() { - logging.log(::timely_logging::Event::GuardedMessage( - ::timely_logging::GuardedMessageEvent { is_start: true })); + logging.when_enabled(|l| l.log(::timely_logging::Event::GuardedMessage( + ::timely_logging::GuardedMessageEvent { is_start: true }))); logic(cap, data); - logging.log(::timely_logging::Event::GuardedMessage( - ::timely_logging::GuardedMessageEvent { is_start: false })); + logging.when_enabled(|l| l.log(::timely_logging::Event::GuardedMessage( + ::timely_logging::GuardedMessageEvent { is_start: false }))); } } diff --git a/src/dataflow/operators/generic/notificator.rs b/src/dataflow/operators/generic/notificator.rs index 4051b98ed..488eaf4d2 100644 --- a/src/dataflow/operators/generic/notificator.rs +++ b/src/dataflow/operators/generic/notificator.rs @@ -81,11 +81,11 @@ impl<'a, T: Timestamp> Notificator<'a, T> { #[inline] pub fn for_each, u64, &mut Notificator)>(&mut self, mut logic: F) { while let Some((cap, count)) = self.next() { - self.logging.log(::timely_logging::Event::GuardedProgress( - ::timely_logging::GuardedProgressEvent { is_start: true })); + self.logging.when_enabled(|l| l.log(::timely_logging::Event::GuardedProgress( + ::timely_logging::GuardedProgressEvent { is_start: true }))); logic(cap, count, self); - self.logging.log(::timely_logging::Event::GuardedProgress( - ::timely_logging::GuardedProgressEvent { is_start: false })); + self.logging.when_enabled(|l| l.log(::timely_logging::Event::GuardedProgress( + ::timely_logging::GuardedProgressEvent { is_start: false }))); } } } diff --git a/src/dataflow/stream.rs b/src/dataflow/stream.rs index a27979df1..147ea5d75 100644 --- a/src/dataflow/stream.rs +++ b/src/dataflow/stream.rs @@ -35,12 +35,12 @@ impl Stream { pub fn connect_to)>+'static>(&self, target: Target, pusher: P, identifier: usize) { let logging = self.scope().logging(); - logging.log(::timely_logging::Event::Channels(::timely_logging::ChannelsEvent { + logging.when_enabled(|l| l.log(::timely_logging::Event::Channels(::timely_logging::ChannelsEvent { id: identifier, scope_addr: self.scope.addr(), source: (self.name.index, self.name.port), target: (target.index, target.port), - })); + }))); self.scope.add_edge(self.name, target); self.ports.add_pusher(pusher); diff --git a/src/lib.rs b/src/lib.rs index 06c2bbc14..92d8b9b7e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,7 +55,7 @@ //! We then introduce input at increasing rounds, indicate the advance to the system (promising //! that we will introduce no more input at prior rounds), and step the computation. -#![forbid(missing_docs)] +// #![forbid(missing_docs)] extern crate abomonation; extern crate timely_communication; diff --git a/src/progress/broadcast.rs b/src/progress/broadcast.rs index 9108172d2..d155c1ca7 100644 --- a/src/progress/broadcast.rs +++ b/src/progress/broadcast.rs @@ -32,10 +32,10 @@ impl Progcaster { /// Creates a new `Progcaster` using a channel from the supplied allocator. pub fn new(allocator: &mut A, path: &Vec, logging: Logger) -> Progcaster { let (pushers, puller, chan) = allocator.allocate(); - logging.log(::timely_logging::Event::CommChannels(::timely_logging::CommChannelsEvent { + logging.when_enabled(|l| l.log(::timely_logging::Event::CommChannels(::timely_logging::CommChannelsEvent { comm_channel: chan, comm_channel_kind: ::timely_logging::CommChannelKind::Progress, - })); + }))); let worker = allocator.index(); let addr = path.clone(); Progcaster { pushers: pushers, puller: puller, source: worker, @@ -54,7 +54,7 @@ impl Progcaster { { if self.pushers.len() > 1 { // if the length is one, just return the updates... if !messages.is_empty() || !internal.is_empty() { - self.logging.log(::timely_logging::Event::Progress(::timely_logging::ProgressEvent { + self.logging.when_enabled(|l| l.log(::timely_logging::Event::Progress(::timely_logging::ProgressEvent { is_send: true, source: self.source, comm_channel: self.comm_channel, @@ -63,7 +63,7 @@ impl Progcaster { // TODO: fill with additional data messages: Vec::new(), internal: Vec::new(), - })); + }))); for pusher in self.pushers.iter_mut() { pusher.push(&mut Some((self.source, self.counter, messages.clone().into_inner(), internal.clone().into_inner()))); @@ -79,16 +79,19 @@ impl Progcaster { // TODO : Could take ownership, and recycle / reuse for next broadcast ... while let Some((ref source, ref counter, ref mut recv_messages, ref mut recv_internal)) = *self.puller.pull() { - self.logging.log(::timely_logging::Event::Progress(::timely_logging::ProgressEvent { + let comm_channel = self.comm_channel; + // TODO(andreal) can we do something about this unnecessary clone when logging is disabled? + let addr = self.addr.clone(); + self.logging.when_enabled(|l| l.log(::timely_logging::Event::Progress(::timely_logging::ProgressEvent { is_send: false, source: *source, seq_no: *counter, - comm_channel: self.comm_channel, - addr: self.addr.clone(), + comm_channel, + addr, // TODO: fill with additional data messages: Vec::new(), internal: Vec::new(), - })); + }))); for (update, delta) in recv_messages.drain(..) { messages.update(update, delta); diff --git a/src/progress/nested/subgraph.rs b/src/progress/nested/subgraph.rs index 3172c5291..e475a2900 100644 --- a/src/progress/nested/subgraph.rs +++ b/src/progress/nested/subgraph.rs @@ -544,11 +544,11 @@ impl SubgraphBuilder { { let mut child_path = self.path.clone(); child_path.push(index); - self.logging.log(::timely_logging::Event::Operates(::timely_logging::OperatesEvent { + self.logging.when_enabled(|l| l.log(::timely_logging::Event::Operates(::timely_logging::OperatesEvent { id: identifier, addr: child_path, name: child.name().to_owned(), - })); + }))); } self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone())) } @@ -924,11 +924,14 @@ impl PerOperatorState { { let changes = &mut self.external_buffer; - if changes.iter_mut().any(|ref mut c| !c.is_empty()) { - self.logging.log(::timely_logging::Event::PushProgress(::timely_logging::PushProgressEvent { - op_id: self.id, - })); - } + let id = self.id; + self.logging.when_enabled(|l| { + if changes.iter_mut().any(|ref mut c| !c.is_empty()) { + l.log(::timely_logging::Event::PushProgress(::timely_logging::PushProgressEvent { + op_id: id, + })); + } + }); self.operator.as_mut().map(|x| x.push_external_progress(changes)); if changes.iter_mut().any(|x| !x.is_empty()) { println!("changes not consumed by {:?}", self.name); @@ -942,9 +945,9 @@ impl PerOperatorState { let active = { - self.logging.log(::timely_logging::Event::Schedule(::timely_logging::ScheduleEvent { + self.logging.when_enabled(|l| l.log(::timely_logging::Event::Schedule(::timely_logging::ScheduleEvent { id: self.id, start_stop: ::timely_logging::StartStop::Start - })); + }))); debug_assert!(self.consumed_buffer.iter_mut().all(|cm| cm.is_empty())); debug_assert!(self.internal_buffer.iter_mut().all(|cm| cm.is_empty())); @@ -965,9 +968,9 @@ impl PerOperatorState { self.internal_buffer.iter_mut().any(|cm| !cm.is_empty()) || self.produced_buffer.iter_mut().any(|cm| !cm.is_empty()); - self.logging.log(::timely_logging::Event::Schedule(::timely_logging::ScheduleEvent { + self.logging.when_enabled(|l| l.log(::timely_logging::Event::Schedule(::timely_logging::ScheduleEvent { id: self.id, start_stop: ::timely_logging::StartStop::Stop { activity: did_work } - })); + }))); result };