From 2e469259346c506158f0c537ac8f94f88b506d4d Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Tue, 4 Jun 2024 10:27:09 +0100 Subject: [PATCH] allow changing io-mode for writes via hidden config --- .../src/loglets/local_loglet/log_store_writer.rs | 7 ++++++- crates/partition-store/src/partition_store.rs | 12 +++++++++++- crates/types/src/config/bifrost.rs | 6 ++++++ crates/types/src/config/worker.rs | 6 ++++++ 4 files changed, 29 insertions(+), 2 deletions(-) diff --git a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs b/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs index 777735463..e7cc77c63 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs +++ b/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs @@ -249,12 +249,17 @@ impl LogStoreWriter { "Committing local loglet current write batch: {} items", write_batch.len(), ); + let io_mode = if opts.always_commit_in_background { + IoMode::AlwaysBackground + } else { + IoMode::Default + }; let result = self .rocksdb .write_batch( "local-loglet-write-batch", Priority::High, - IoMode::default(), + io_mode, write_opts, write_batch, ) diff --git a/crates/partition-store/src/partition_store.rs b/crates/partition-store/src/partition_store.rs index 651eb5462..ca3f1cd29 100644 --- a/crates/partition-store/src/partition_store.rs +++ b/crates/partition-store/src/partition_store.rs @@ -18,6 +18,7 @@ use codederror::CodedError; use restate_rocksdb::CfName; use restate_rocksdb::IoMode; use restate_rocksdb::Priority; +use restate_types::config::Configuration; use rocksdb::DBCompressionType; use rocksdb::DBPinnableSlice; use rocksdb::DBRawIteratorWithThreadMode; @@ -503,11 +504,20 @@ impl<'a> Transaction for RocksDBTransaction<'a> { if write_batch.is_empty() { return Ok(()); } + let io_mode = if Configuration::pinned() + .worker + .storage + .always_commit_in_background + { + IoMode::AlwaysBackground + } else { + IoMode::Default + }; let mut opts = rocksdb::WriteOptions::default(); // We disable WAL since bifrost is our durable distributed log. opts.disable_wal(true); self.rocksdb - .write_tx_batch(Priority::High, IoMode::AlwaysBackground, opts, write_batch) + .write_tx_batch(Priority::High, io_mode, opts, write_batch) .await .map_err(|error| StorageError::Generic(error.into())) } diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index f7a285b6d..95ee4aeed 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -73,6 +73,11 @@ pub struct LocalLogletOptions { /// Disable fsync of WAL on every batch rocksdb_disable_wal_fsync: bool, + /// Whether to perform commits in background IO thread pools eagerly or not + #[cfg_attr(feature = "schemars", schemars(skip))] + #[serde(skip_serializing_if = "std::ops::Not::not", default)] + pub always_commit_in_background: bool, + /// Trigger a commit when the batch size exceeds this threshold. /// /// Set to 0 or 1 to commit the write batch on every command. @@ -134,6 +139,7 @@ impl Default for LocalLogletOptions { writer_batch_commit_count: 2000, writer_batch_commit_duration: Duration::ZERO.into(), rocksdb_disable_wal_fsync: false, + always_commit_in_background: false, } } } diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 261c6b7ad..3ef33e06e 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -222,6 +222,11 @@ pub struct StorageOptions { /// persist a lsn if the partition processor has applied at least #threshold log entries since /// the last persisting. This prevents the worker from flushing the RocksDB memtables too often. pub persist_lsn_threshold: u64, + + /// Whether to perform commits in background IO thread pools eagerly or not + #[cfg_attr(feature = "schemars", schemars(skip))] + #[serde(skip_serializing_if = "std::ops::Not::not", default)] + pub always_commit_in_background: bool, } impl StorageOptions { @@ -286,6 +291,7 @@ impl Default for StorageOptions { // persist the lsn every hour persist_lsn_interval: Some(Duration::from_secs(60 * 60).into()), persist_lsn_threshold: 1000, + always_commit_in_background: true, } } }