diff --git a/communication/src/logging.rs b/communication/src/logging.rs index 1ce8608a1..0b4c2b5f2 100644 --- a/communication/src/logging.rs +++ b/communication/src/logging.rs @@ -4,5 +4,5 @@ use timely_logging::{CommsEvent, CommsSetup}; pub use timely_logging::CommunicationEvent; pub use timely_logging::SerializationEvent; -/// TODO(andreal) +/// A log writer for a communication thread. pub type CommsLogger = Rc<::timely_logging::BufferingLogger>; diff --git a/logging/src/lib.rs b/logging/src/lib.rs index 3ba8c5803..72b3b8af4 100644 --- a/logging/src/lib.rs +++ b/logging/src/lib.rs @@ -29,17 +29,6 @@ pub fn get_precise_time_ns() -> u64 { (time::precise_time_ns() as i64 - delta) as u64 } -// /// TODO(andreal) -// fn initialize_precise_time_ns() { -// unsafe { -// precise_time_ns_delta = Some({ -// let wall_time = time::get_time(); -// let wall_time_ns = wall_time.nsec as i64 + wall_time.sec * 1000000000; -// time::precise_time_ns() as i64 - wall_time_ns -// }); -// } -// } - /// Logging methods pub trait Logger { /// The type of loggable record. diff --git a/src/dataflow/scopes/child.rs b/src/dataflow/scopes/child.rs index 2f7c6ce0a..1df516b97 100644 --- a/src/dataflow/scopes/child.rs +++ b/src/dataflow/scopes/child.rs @@ -18,7 +18,7 @@ pub struct Child<'a, G: ScopeParent, T: Timestamp> { pub subgraph: &'a RefCell>, /// A copy of the child's parent scope. pub parent: G, - /// TODO(andreal) + /// The log writer for this scope. pub logging: Logger, } diff --git a/src/dataflow/scopes/mod.rs b/src/dataflow/scopes/mod.rs index 4b7665385..687c3fee2 100644 --- a/src/dataflow/scopes/mod.rs +++ b/src/dataflow/scopes/mod.rs @@ -76,6 +76,6 @@ pub trait Scope: ScopeParent { /// ``` fn scoped)->R>(&mut self, func: F) -> R; - /// TODO(andreal) + /// Obtains the logger associated with this scope. fn logging(&self) -> Logger; } diff --git a/src/dataflow/scopes/root.rs b/src/dataflow/scopes/root.rs index df2b62030..b651d7bf4 100644 --- a/src/dataflow/scopes/root.rs +++ b/src/dataflow/scopes/root.rs @@ -52,7 +52,7 @@ impl Root { // discard completed dataflows. self.dataflows.borrow_mut().retain(|dataflow| dataflow.active()); - // TODO(andreal) flush logs? + // TODO(andreal) do we want to flush logs here? active } diff --git a/src/logging.rs b/src/logging.rs index 719e82b6c..46876da74 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -25,10 +25,10 @@ use timely_logging::LoggerBatch; type LogMessage = (u64, EventsSetup, LogEvent); type CommsMessage = (u64, CommsSetup, CommsEvent); -/// TODO(andreal) +/// A log writer. pub type Logger = Rc>; -/// TODO(andreal) +/// A log writer that does not log anything. pub fn new_inactive_logger() -> Logger { BufferingLogger::<(), ()>::new_inactive() } @@ -83,37 +83,22 @@ impl LogManager { } } -struct SharedVec { - inner: Arc>>, +struct SharedEventWriter { + inner: Arc>>, } -impl SharedVec { - pub fn new(inner: Arc>>) -> Self { - SharedVec { - inner: inner, +impl Clone for SharedEventWriter { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), } } } -impl Write for SharedVec { - fn write(&mut self, data: &[u8]) -> Result { - self.inner.lock().unwrap().extend_from_slice(data); - Ok(data.len()) - } - - fn flush(&mut self) -> Result<(), ::std::io::Error> { - Ok(()) - } -} - -struct SharedEventWriter { - inner: Mutex>, -} - impl SharedEventWriter { fn new(w: W) -> Self { SharedEventWriter { - inner: Mutex::new(EventWriter::new(w)), + inner: Arc::new(Mutex::new(EventWriter::new(w))), } } } @@ -125,7 +110,7 @@ impl EventPusher for } } -/// TODO(andreal) +/// An handle to the LogManager that constructs a filter for the log records. pub struct FilteredLogManager { log_manager: Arc>, filter: Arcbool+Send+Sync>, @@ -133,8 +118,10 @@ pub struct FilteredLogManager { } impl FilteredLogManager { - /// TODO(andreal) - pub fn to_tcp_socket(&mut self) { + /// Send the selected timely events to one tcp socket per log sender. + /// + /// Destination can be set with the TIMELY_COMM_LOG_TARGET environment variable. + pub fn to_tcp_sockets(&mut self) { let target: String = ::std::env::var("TIMELY_LOG_TARGET").expect("no $TIMELY_LOG_TARGET, e.g. 127.0.0.1:34254"); // let writer = SharedEventWriter::new(writer); @@ -145,24 +132,20 @@ impl FilteredLogManager { self.log_manager.lock().unwrap().add_timely_subscription(self.filter.clone(), pusher); } - // /// TODO(andreal) - // pub fn to_bufs(&mut self) -> Vec>>> { - // let mut vecs = Vec::new(); - - // for i in 0..4 { - // let buf = Arc::new(Mutex::new(Vec::::with_capacity(4_000_000_000))); - // let writer = SharedEventWriter::new(SharedVec::new(buf.clone())); - // let pusher: Arc, LogMessage>+Send+Sync> = Arc::new(writer); - // self.log_manager.lock().unwrap().add_timely_subscription(Arc::new(move |s| s.index == i), pusher); - // vecs.push(buf); - // } - - // vecs - // } + /// Send the selected timely events to a shared tcp socket. + /// + /// Destination can be set with the TIMELY_COMM_LOG_TARGET environment variable. + pub fn to_shared_tcp_socket(&mut self) { + let target: String = ::std::env::var("TIMELY_LOG_TARGET").expect("no $TIMELY_LOG_TARGET, e.g. 127.0.0.1:34254"); + let pusher = Box::new(SharedEventWriter::new(TcpStream::connect(target.clone()).expect("failed to connect to logging destination"))); + self.log_manager.lock().unwrap().add_timely_subscription(self.filter.clone(), Arc::new(move || pusher.clone())); + } } impl FilteredLogManager { - /// TODO(andreal) + /// Send the selected communication events to a tcp socket. + /// + /// Destination can be set with the TIMELY_COMM_LOG_TARGET environment variable. pub fn to_tcp_socket(&mut self) { let comm_target = ::std::env::var("TIMELY_COMM_LOG_TARGET").expect("no $TIMELY_COMM_LOG_TARGET, e.g. 127.0.0.1:34255"); @@ -175,7 +158,7 @@ impl FilteredLogManager { } impl LogManager { - /// TODO(andreal) + /// Constructs a new LogManager. pub fn new() -> Arc> { Arc::new(Mutex::new(LogManager { timely_logs: HashMap::new(), @@ -186,17 +169,16 @@ impl LogManager { } } -/// TODO(andreal) +/// Functions to construct log filters. pub trait LogFilter { - /// TODO(andreal) + /// Get a `FilteredLogManager` for all workers. fn workers(&mut self) -> FilteredLogManager; - /// TODO(andreal) + /// Get a `FilteredLogManager` for all communication threads. fn comms(&mut self) -> FilteredLogManager; } impl LogFilter for Arc> { - /// TODO(andreal) #[inline] fn workers(&mut self) -> FilteredLogManager { FilteredLogManager { log_manager: self.clone(), @@ -205,7 +187,6 @@ impl LogFilter for Arc> { } } - /// TODO(andreal) #[inline] fn comms(&mut self) -> FilteredLogManager { FilteredLogManager { log_manager: self.clone(), @@ -215,11 +196,11 @@ impl LogFilter for Arc> { } } -/// TODO(andreal) +/// Shared wrapper for log writer constructors. pub struct LoggerConfig { - /// TODO(andreal) + /// Log writer constructors. pub timely_logging: ArcRc>+Send+Sync>, - /// TODO(andreal) + /// Log writer constructors for communication. pub communication_logging: ArcRc>+Send+Sync>, } @@ -249,7 +230,7 @@ impl LoggerConfig { event_manager } - /// TODO(andreal) + /// Makes a new `LoggerConfig` wrapper from a `LogManager`. pub fn new(log_manager: Arc>) -> Self { let timely_logging_manager = log_manager.clone(); let communication_logging_manager = log_manager; @@ -257,7 +238,6 @@ impl LoggerConfig { timely_logging: Arc::new(move |events_setup: EventsSetup| { let subscription_manager = LoggerConfig::register_timely_logger( &mut timely_logging_manager.lock().unwrap(), events_setup); - //eprintln!("registered timely logger: {:?}", events_setup); Rc::new(BufferingLogger::new(events_setup, Box::new(move |data| { subscription_manager.lock().expect("cannot lock mutex").publish_batch(data); }))) @@ -265,7 +245,6 @@ impl LoggerConfig { communication_logging: Arc::new(move |comms_setup: CommsSetup| { let subscription_manager = LoggerConfig::register_comms_logger( &mut communication_logging_manager.lock().unwrap(), comms_setup); - //eprintln!("registered comm logger: {:?}", comms_setup); Rc::new(BufferingLogger::new(comms_setup, Box::new(move |data| { subscription_manager.lock().expect("cannot lock mutex").publish_batch(data); }))) diff --git a/src/progress/nested/subgraph.rs b/src/progress/nested/subgraph.rs index de864bf3f..b8902d748 100644 --- a/src/progress/nested/subgraph.rs +++ b/src/progress/nested/subgraph.rs @@ -946,11 +946,9 @@ impl PerOperatorState { id: self.id, start_stop: ::timely_logging::StartStop::Start })); - // TODO(andreal) formerly behind "logging" feature flag { - // assert!(self.consumed_buffer.iter_mut().all(|cm| cm.is_empty())); - // assert!(self.internal_buffer.iter_mut().all(|cm| cm.is_empty())); - // assert!(self.produced_buffer.iter_mut().all(|cm| cm.is_empty())); - // } + debug_assert!(self.consumed_buffer.iter_mut().all(|cm| cm.is_empty())); + debug_assert!(self.internal_buffer.iter_mut().all(|cm| cm.is_empty())); + debug_assert!(self.produced_buffer.iter_mut().all(|cm| cm.is_empty())); let result = if let Some(ref mut operator) = self.operator { operator.pull_internal_progress(