Skip to content

Commit

Permalink
better docs, no warnings, and to_shared_tcp_socket
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Lattuada <andrea.lattuada@inf.ethz.ch>
  • Loading branch information
utaal committed Dec 6, 2017
1 parent 65f3824 commit 9f3fea5
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 74 deletions.
2 changes: 1 addition & 1 deletion communication/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ use timely_logging::{CommsEvent, CommsSetup};
pub use timely_logging::CommunicationEvent;
pub use timely_logging::SerializationEvent;

/// TODO(andreal)
/// A log writer for a communication thread.
pub type CommsLogger = Rc<::timely_logging::BufferingLogger<CommsSetup, CommsEvent>>;
11 changes: 0 additions & 11 deletions logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,6 @@ pub fn get_precise_time_ns() -> u64 {
(time::precise_time_ns() as i64 - delta) as u64
}

// /// TODO(andreal)
// fn initialize_precise_time_ns() {
// unsafe {
// precise_time_ns_delta = Some({
// let wall_time = time::get_time();
// let wall_time_ns = wall_time.nsec as i64 + wall_time.sec * 1000000000;
// time::precise_time_ns() as i64 - wall_time_ns
// });
// }
// }

/// Logging methods
pub trait Logger {
/// The type of loggable record.
Expand Down
2 changes: 1 addition & 1 deletion src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Child<'a, G: ScopeParent, T: Timestamp> {
pub subgraph: &'a RefCell<SubgraphBuilder<G::Timestamp, T>>,
/// A copy of the child's parent scope.
pub parent: G,
/// TODO(andreal)
/// The log writer for this scope.
pub logging: Logger,
}

Expand Down
2 changes: 1 addition & 1 deletion src/dataflow/scopes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ pub trait Scope: ScopeParent {
/// ```
fn scoped<T: Timestamp, R, F:FnOnce(&mut Child<Self, T>)->R>(&mut self, func: F) -> R;

/// TODO(andreal)
/// Obtains the logger associated with this scope.
fn logging(&self) -> Logger;
}
2 changes: 1 addition & 1 deletion src/dataflow/scopes/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<A: Allocate> Root<A> {
// discard completed dataflows.
self.dataflows.borrow_mut().retain(|dataflow| dataflow.active());

// TODO(andreal) flush logs?
// TODO(andreal) do we want to flush logs here?

active
}
Expand Down
87 changes: 33 additions & 54 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ use timely_logging::LoggerBatch;
type LogMessage = (u64, EventsSetup, LogEvent);
type CommsMessage = (u64, CommsSetup, CommsEvent);

/// TODO(andreal)
/// A log writer.
pub type Logger = Rc<BufferingLogger<EventsSetup, LogEvent>>;

/// TODO(andreal)
/// A log writer that does not log anything.
pub fn new_inactive_logger() -> Logger {
BufferingLogger::<(), ()>::new_inactive()
}
Expand Down Expand Up @@ -83,37 +83,22 @@ impl LogManager {
}
}

struct SharedVec {
inner: Arc<Mutex<Vec<u8>>>,
struct SharedEventWriter<T, D, W: Write> {
inner: Arc<Mutex<EventWriter<T, D, W>>>,
}

impl SharedVec {
pub fn new(inner: Arc<Mutex<Vec<u8>>>) -> Self {
SharedVec {
inner: inner,
impl<T, D, W: Write> Clone for SharedEventWriter<T, D, W> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}

impl Write for SharedVec {
fn write(&mut self, data: &[u8]) -> Result<usize, ::std::io::Error> {
self.inner.lock().unwrap().extend_from_slice(data);
Ok(data.len())
}

fn flush(&mut self) -> Result<(), ::std::io::Error> {
Ok(())
}
}

struct SharedEventWriter<T, D, W: Write> {
inner: Mutex<EventWriter<T, D, W>>,
}

impl<T, D, W: Write> SharedEventWriter<T, D, W> {
fn new(w: W) -> Self {
SharedEventWriter {
inner: Mutex::new(EventWriter::new(w)),
inner: Arc::new(Mutex::new(EventWriter::new(w))),
}
}
}
Expand All @@ -125,16 +110,18 @@ impl<T: Abomonation+Debug, D: Abomonation+Debug, W: Write> EventPusher<T, D> for
}
}

/// TODO(andreal)
/// An handle to the LogManager that constructs a filter for the log records.
pub struct FilteredLogManager<S, E> {
log_manager: Arc<Mutex<LogManager>>,
filter: Arc<Fn(&S)->bool+Send+Sync>,
_e: ::std::marker::PhantomData<E>,
}

impl FilteredLogManager<EventsSetup, LogEvent> {
/// TODO(andreal)
pub fn to_tcp_socket(&mut self) {
/// Send the selected timely events to one tcp socket per log sender.
///
/// Destination can be set with the TIMELY_COMM_LOG_TARGET environment variable.
pub fn to_tcp_sockets(&mut self) {
let target: String = ::std::env::var("TIMELY_LOG_TARGET").expect("no $TIMELY_LOG_TARGET, e.g. 127.0.0.1:34254");

// let writer = SharedEventWriter::new(writer);
Expand All @@ -145,24 +132,20 @@ impl FilteredLogManager<EventsSetup, LogEvent> {
self.log_manager.lock().unwrap().add_timely_subscription(self.filter.clone(), pusher);
}

// /// TODO(andreal)
// pub fn to_bufs(&mut self) -> Vec<Arc<Mutex<Vec<u8>>>> {
// let mut vecs = Vec::new();

// for i in 0..4 {
// let buf = Arc::new(Mutex::new(Vec::<u8>::with_capacity(4_000_000_000)));
// let writer = SharedEventWriter::new(SharedVec::new(buf.clone()));
// let pusher: Arc<EventPusher<Product<RootTimestamp, u64>, LogMessage>+Send+Sync> = Arc::new(writer);
// self.log_manager.lock().unwrap().add_timely_subscription(Arc::new(move |s| s.index == i), pusher);
// vecs.push(buf);
// }

// vecs
// }
/// Send the selected timely events to a shared tcp socket.
///
/// Destination can be set with the TIMELY_COMM_LOG_TARGET environment variable.
pub fn to_shared_tcp_socket(&mut self) {
let target: String = ::std::env::var("TIMELY_LOG_TARGET").expect("no $TIMELY_LOG_TARGET, e.g. 127.0.0.1:34254");
let pusher = Box::new(SharedEventWriter::new(TcpStream::connect(target.clone()).expect("failed to connect to logging destination")));
self.log_manager.lock().unwrap().add_timely_subscription(self.filter.clone(), Arc::new(move || pusher.clone()));
}
}

impl FilteredLogManager<CommsSetup, CommsEvent> {
/// TODO(andreal)
/// Send the selected communication events to a tcp socket.
///
/// Destination can be set with the TIMELY_COMM_LOG_TARGET environment variable.
pub fn to_tcp_socket(&mut self) {
let comm_target = ::std::env::var("TIMELY_COMM_LOG_TARGET").expect("no $TIMELY_COMM_LOG_TARGET, e.g. 127.0.0.1:34255");

Expand All @@ -175,7 +158,7 @@ impl FilteredLogManager<CommsSetup, CommsEvent> {
}

impl LogManager {
/// TODO(andreal)
/// Constructs a new LogManager.
pub fn new() -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(LogManager {
timely_logs: HashMap::new(),
Expand All @@ -186,17 +169,16 @@ impl LogManager {
}
}

/// TODO(andreal)
/// Functions to construct log filters.
pub trait LogFilter {
/// TODO(andreal)
/// Get a `FilteredLogManager` for all workers.
fn workers(&mut self) -> FilteredLogManager<EventsSetup, LogEvent>;

/// TODO(andreal)
/// Get a `FilteredLogManager` for all communication threads.
fn comms(&mut self) -> FilteredLogManager<CommsSetup, CommsEvent>;
}

impl LogFilter for Arc<Mutex<LogManager>> {
/// TODO(andreal)
#[inline] fn workers(&mut self) -> FilteredLogManager<EventsSetup, LogEvent> {
FilteredLogManager {
log_manager: self.clone(),
Expand All @@ -205,7 +187,6 @@ impl LogFilter for Arc<Mutex<LogManager>> {
}
}

/// TODO(andreal)
#[inline] fn comms(&mut self) -> FilteredLogManager<CommsSetup, CommsEvent> {
FilteredLogManager {
log_manager: self.clone(),
Expand All @@ -215,11 +196,11 @@ impl LogFilter for Arc<Mutex<LogManager>> {
}
}

/// TODO(andreal)
/// Shared wrapper for log writer constructors.
pub struct LoggerConfig {
/// TODO(andreal)
/// Log writer constructors.
pub timely_logging: Arc<Fn(EventsSetup)->Rc<BufferingLogger<EventsSetup, LogEvent>>+Send+Sync>,
/// TODO(andreal)
/// Log writer constructors for communication.
pub communication_logging: Arc<Fn(CommsSetup)->Rc<BufferingLogger<CommsSetup, CommsEvent>>+Send+Sync>,
}

Expand Down Expand Up @@ -249,23 +230,21 @@ impl LoggerConfig {
event_manager
}

/// TODO(andreal)
/// Makes a new `LoggerConfig` wrapper from a `LogManager`.
pub fn new(log_manager: Arc<Mutex<LogManager>>) -> Self {
let timely_logging_manager = log_manager.clone();
let communication_logging_manager = log_manager;
LoggerConfig {
timely_logging: Arc::new(move |events_setup: EventsSetup| {
let subscription_manager = LoggerConfig::register_timely_logger(
&mut timely_logging_manager.lock().unwrap(), events_setup);
//eprintln!("registered timely logger: {:?}", events_setup);
Rc::new(BufferingLogger::new(events_setup, Box::new(move |data| {
subscription_manager.lock().expect("cannot lock mutex").publish_batch(data);
})))
}),
communication_logging: Arc::new(move |comms_setup: CommsSetup| {
let subscription_manager = LoggerConfig::register_comms_logger(
&mut communication_logging_manager.lock().unwrap(), comms_setup);
//eprintln!("registered comm logger: {:?}", comms_setup);
Rc::new(BufferingLogger::new(comms_setup, Box::new(move |data| {
subscription_manager.lock().expect("cannot lock mutex").publish_batch(data);
})))
Expand Down
8 changes: 3 additions & 5 deletions src/progress/nested/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,11 +946,9 @@ impl<T: Timestamp> PerOperatorState<T> {
id: self.id, start_stop: ::timely_logging::StartStop::Start
}));

// TODO(andreal) formerly behind "logging" feature flag {
// assert!(self.consumed_buffer.iter_mut().all(|cm| cm.is_empty()));
// assert!(self.internal_buffer.iter_mut().all(|cm| cm.is_empty()));
// assert!(self.produced_buffer.iter_mut().all(|cm| cm.is_empty()));
// }
debug_assert!(self.consumed_buffer.iter_mut().all(|cm| cm.is_empty()));
debug_assert!(self.internal_buffer.iter_mut().all(|cm| cm.is_empty()));
debug_assert!(self.produced_buffer.iter_mut().all(|cm| cm.is_empty()));

let result = if let Some(ref mut operator) = self.operator {
operator.pull_internal_progress(
Expand Down

0 comments on commit 9f3fea5

Please sign in to comment.