diff --git a/Cargo.lock b/Cargo.lock index cc58708..107fb62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -314,6 +314,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -493,6 +503,7 @@ dependencies = [ "chrono", "clap", "console", + "crossbeam-channel", "crossbeam-utils", "csv", "dashmap", diff --git a/fclones/Cargo.toml b/fclones/Cargo.toml index 9407fee..19a38b7 100644 --- a/fclones/Cargo.toml +++ b/fclones/Cargo.toml @@ -25,6 +25,7 @@ byte-unit = "4.0" chrono = { version = "0.4", default-features = false, features = ["serde", "clock", "std"] } clap = { version = "4.4", features = ["derive", "cargo", "wrap_help"] } console = "0.15" +crossbeam-channel = "0.5" crossbeam-utils = "0.8" csv = "1.1" dashmap = "5.2" diff --git a/fclones/src/cache.rs b/fclones/src/cache.rs index daf701b..30e1eb5 100644 --- a/fclones/src/cache.rs +++ b/fclones/src/cache.rs @@ -1,9 +1,9 @@ //! Persistent caching of file hashes +use crossbeam_channel::RecvTimeoutError; use std::fmt::{Display, Formatter}; use std::fs::create_dir_all; -use std::sync::mpsc::{channel, RecvTimeoutError, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread; use std::thread::JoinHandle; use std::time::{Duration, UNIX_EPOCH}; @@ -47,7 +47,7 @@ const FLUSH_INTERVAL: Duration = Duration::from_millis(1000); /// them from file data. pub struct HashCache { cache: Arc, - _flusher: HashCacheFlusher, + flusher: HashCacheFlusher, } impl HashCache { @@ -76,10 +76,7 @@ impl HashCache { let tree_id = format!("hash_db:{:?}:{}", algorithm, transform.unwrap_or("")); let cache = Arc::new(typed_sled::Tree::open(&db, tree_id)); let flusher = HashCacheFlusher::start(&cache); - Ok(HashCache { - cache, - _flusher: flusher, - }) + Ok(HashCache { cache, flusher }) } /// Opens the file hash database located in `fclones` subdir of user cache directory. @@ -115,7 +112,11 @@ impl HashCache { .insert(key, &value) .map_err(|e| format!("Failed to write entry to cache: {e}"))?; - Ok(()) + // Check for cache flush errors. If there were errors, report them to the caller. + match self.flusher.err_channel.try_recv() { + Ok(err) => Err(err), + Err(_) => Ok(()), + } } /// Retrieves the cached hash of a file. @@ -163,24 +164,36 @@ impl HashCache { }; Ok(key) } + + /// Flushes all unwritten data and closes the cache. + pub fn close(self) -> Result<(), Error> { + self.cache + .flush() + .map_err(|e| format!("Failed to flush cache: {e}"))?; + Ok(()) + } } /// Periodically flushes the cache in a background thread struct HashCacheFlusher { thread_handle: Option>, - control_channel: Option>>, // wrapped in Mutex because Sender is not Send in older versions of Rust + control_channel: Option>, + err_channel: crossbeam_channel::Receiver, } impl HashCacheFlusher { fn start(cache: &Arc) -> HashCacheFlusher { let cache = Arc::downgrade(cache); - let (tx, rx) = channel::<()>(); + let (ctrl_tx, ctrl_rx) = crossbeam_channel::bounded::<()>(1); + let (err_tx, err_rx) = crossbeam_channel::bounded(1); let thread_handle = thread::spawn(move || { - while let Err(RecvTimeoutError::Timeout) = rx.recv_timeout(FLUSH_INTERVAL) { + while let Err(RecvTimeoutError::Timeout) = ctrl_rx.recv_timeout(FLUSH_INTERVAL) { if let Some(cache) = cache.upgrade() { if let Err(e) = cache.flush() { - eprintln!("Failed to flush hash cache: {e}"); + err_tx + .send(format!("Failed to flush the hash cache: {e}").into()) + .unwrap_or_default(); return; } } @@ -189,7 +202,8 @@ impl HashCacheFlusher { HashCacheFlusher { thread_handle: Some(thread_handle), - control_channel: Some(Mutex::new(tx)), + control_channel: Some(ctrl_tx), + err_channel: err_rx, } } } diff --git a/fclones/src/hasher.rs b/fclones/src/hasher.rs index 415545c..7c8d2a4 100644 --- a/fclones/src/hasher.rs +++ b/fclones/src/hasher.rs @@ -437,6 +437,16 @@ impl FileHasher<'_> { } } +impl<'a> Drop for FileHasher<'a> { + fn drop(&mut self) { + if let Some(cache) = self.cache.take() { + if let Err(e) = cache.close() { + self.log.warn(e); + } + } + } +} + fn format_output_stream(output: &str) -> String { let output = output.trim().to_string(); if output.is_empty() {