Skip to content

Commit

Permalink
[experiment] Try threadpool instead of rayon for io pool
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Jun 4, 2024
1 parent 4071006 commit 73fba63
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 16 deletions.
11 changes: 10 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ futures-util = { workspace = true }
metrics = {workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
rayon = { workspace = true }
rocksdb = { workspace = true }
smartstring = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
threadpool = { version = "1.8" }
tokio = { workspace = true }
tracing = { workspace = true }

Expand Down
26 changes: 12 additions & 14 deletions crates/rocksdb/src/db_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ pub struct RocksDbManager {
dbs: RwLock<HashMap<DbName, Arc<RocksDb>>>,
watchdog_tx: mpsc::UnboundedSender<WatchdogCommand>,
shutting_down: AtomicBool,
high_pri_pool: rayon::ThreadPool,
low_pri_pool: rayon::ThreadPool,
high_pri_pool: threadpool::ThreadPool,
low_pri_pool: threadpool::ThreadPool,
}

impl Debug for RocksDbManager {
Expand Down Expand Up @@ -95,17 +95,15 @@ impl RocksDbManager {
env.set_background_threads(opts.rocksdb_bg_threads().get() as i32);

// Create our own storage thread pools
let high_pri_pool = rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("rs:io-hi-{}", i))
let high_pri_pool = threadpool::Builder::new()
.thread_name("rs:io-hi".to_owned())
.num_threads(opts.storage_high_priority_bg_threads().into())
.build()
.expect("storage high priority thread pool to be created");
.build();

let low_pri_pool = rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("rs:io-lo-{}", i))
let low_pri_pool = threadpool::Builder::new()
.thread_name("rs:io-lo".to_owned())
.num_threads(opts.storage_low_priority_bg_threads().into())
.build()
.expect("storage low priority thread pool to be created");
.build();

let dbs = RwLock::default();

Expand Down Expand Up @@ -377,8 +375,8 @@ impl RocksDbManager {
let (tx, rx) = tokio::sync::oneshot::channel();
let priority = task.priority;
match priority {
Priority::High => self.high_pri_pool.spawn(task.into_async_runner(tx)),
Priority::Low => self.low_pri_pool.spawn(task.into_async_runner(tx)),
Priority::High => self.high_pri_pool.execute(task.into_async_runner(tx)),
Priority::Low => self.low_pri_pool.execute(task.into_async_runner(tx)),
}
rx.await.map_err(|_| ShutdownError)
}
Expand All @@ -403,8 +401,8 @@ impl RocksDbManager {
OP: FnOnce() + Send + 'static,
{
match task.priority {
Priority::High => self.high_pri_pool.spawn(task.into_runner()),
Priority::Low => self.low_pri_pool.spawn(task.into_runner()),
Priority::High => self.high_pri_pool.execute(task.into_runner()),
Priority::Low => self.low_pri_pool.execute(task.into_runner()),
}
}
}
Expand Down

0 comments on commit 73fba63

Please sign in to comment.