Skip to content

Commit

Permalink
*: remove unnecessary async blocks to save memory (tikv#16541)
Browse files Browse the repository at this point in the history
close tikv#16540

*: enable linters about async and futures
  We should be pedantic about writing async code, as it's easy to write
  suboptimal or even bloat code.
  See: rust-lang/rust#69826

*: remove unnecessary async blocks to save memory
  This commit favors FutureExt::map over async blocks to mitigate
  the issue of async block doubled memory usage. Through the sysbench
  oltp_read_only test, it was observed that this adjustment resulted
  in approximately 26% reduction in memory usage.
  See: rust-lang/rust#59087

Signed-off-by: Neil Shen <overvenus@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Signed-off-by: dbsid <chenhuansheng@pingcap.com>
  • Loading branch information
2 people authored and dbsid committed Mar 24, 2024
1 parent 72d5717 commit 55b3a52
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 64 deletions.
1 change: 1 addition & 0 deletions components/backup-stream/src/checkpoint_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ pub mod tests {
Self(Arc::new(Mutex::new(inner)))
}

#[allow(clippy::unused_async)]
pub async fn fail(&self, status: RpcStatus) -> crate::errors::Result<()> {
panic!("failed in a case should never fail: {}", status);
}
Expand Down
6 changes: 3 additions & 3 deletions components/backup-stream/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ impl StreamTaskInfo {
#[allow(clippy::map_entry)]
if !w.contains_key(&key) {
let path = key.temp_file_name();
let val = Mutex::new(DataFile::new(path, &self.temp_file_pool).await?);
let val = Mutex::new(DataFile::new(path, &self.temp_file_pool)?);
w.insert(key, val);
}

Expand Down Expand Up @@ -1444,7 +1444,7 @@ impl MetadataInfo {
impl DataFile {
/// create and open a logfile at the path.
/// Note: if a file with same name exists, would truncate it.
async fn new(local_path: impl AsRef<Path>, files: &Arc<TempFilePool>) -> Result<Self> {
fn new(local_path: impl AsRef<Path>, files: &Arc<TempFilePool>) -> Result<Self> {
let sha256 = Hasher::new(MessageDigest::sha256())
.map_err(|err| Error::Other(box_err!("openssl hasher failed to init: {}", err)))?;
let inner = files.open_for_write(local_path.as_ref())?;
Expand Down Expand Up @@ -2434,7 +2434,7 @@ mod tests {
let mut f = pool.open_for_write(file_path).unwrap();
f.write_all(b"test-data").await?;
f.done().await?;
let mut data_file = DataFile::new(&file_path, &pool).await.unwrap();
let mut data_file = DataFile::new(file_path, &pool).unwrap();
let info = DataFileInfo::new();

let mut meta = MetadataInfo::with_capacity(1);
Expand Down
14 changes: 7 additions & 7 deletions components/backup-stream/src/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ where
let now = Instant::now();
let timedout = self.wait(Duration::from_secs(5)).await;
if timedout {
warn!("waiting for initial scanning done timed out, forcing progress!";
warn!("waiting for initial scanning done timed out, forcing progress!";
"take" => ?now.saturating_elapsed(), "timedout" => %timedout);
}
let regions = resolver.resolve(self.subs.current_regions(), min_ts).await;
Expand All @@ -453,7 +453,7 @@ where
callback(ResolvedRegions::new(rts, cps));
}
ObserveOp::HighMemUsageWarning { region_id } => {
self.on_high_memory_usage(region_id).await;
self.on_high_memory_usage(region_id);
}
}
}
Expand Down Expand Up @@ -507,7 +507,7 @@ where
}
}

async fn on_high_memory_usage(&mut self, inconsistent_region_id: u64) {
fn on_high_memory_usage(&mut self, inconsistent_region_id: u64) {
let mut lame_region = Region::new();
lame_region.set_id(inconsistent_region_id);
let mut act_region = None;
Expand All @@ -517,9 +517,9 @@ where
});
let delay = OOM_BACKOFF_BASE
+ Duration::from_secs(rand::thread_rng().gen_range(0..OOM_BACKOFF_JITTER_SECS));
info!("log backup triggering high memory usage.";
"region" => %inconsistent_region_id,
"mem_usage" => %self.memory_manager.used_ratio(),
info!("log backup triggering high memory usage.";
"region" => %inconsistent_region_id,
"mem_usage" => %self.memory_manager.used_ratio(),
"mem_max" => %self.memory_manager.capacity());
if let Some(region) = act_region {
self.schedule_start_observe(delay, region, None);
Expand Down Expand Up @@ -786,7 +786,7 @@ where
let feedback_channel = match self.messenger.upgrade() {
Some(ch) => ch,
None => {
warn!("log backup subscription manager is shutting down, aborting new scan.";
warn!("log backup subscription manager is shutting down, aborting new scan.";
utils::slog_region(region), "handle" => ?handle.id);
return;
}
Expand Down
12 changes: 6 additions & 6 deletions components/backup/src/softlimit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl SoftLimit {
Ok(())
}

async fn grant_tokens(&self, n: usize) {
fn grant_tokens(&self, n: usize) {
self.0.semaphore.add_permits(n);
}

Expand All @@ -53,9 +53,9 @@ impl SoftLimit {

/// Grows the tasks can be executed concurrently by n
#[cfg(test)]
pub async fn grow(&self, n: usize) {
pub fn grow(&self, n: usize) {
self.0.cap.fetch_add(n, Ordering::SeqCst);
self.grant_tokens(n).await;
self.grant_tokens(n);
}

/// resize the tasks available concurrently.
Expand All @@ -66,7 +66,7 @@ impl SoftLimit {
self.take_tokens(current - target).await?;
}
CmpOrder::Less => {
self.grant_tokens(target - current).await;
self.grant_tokens(target - current);
}
_ => {}
}
Expand Down Expand Up @@ -304,7 +304,7 @@ mod softlimit_test {
)
.await;

limit_cloned.grow(1).await;
limit_cloned.grow(1);
let working_cloned = working.clone();
should_satisfy_in(
Duration::from_secs(10),
Expand All @@ -314,7 +314,7 @@ mod softlimit_test {
.await;

let working_cloned = working.clone();
limit_cloned.grow(2).await;
limit_cloned.grow(2);
should_satisfy_in(
Duration::from_secs(10),
"waiting for worker grow to 4",
Expand Down
7 changes: 3 additions & 4 deletions components/resolved_ts/src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct ScanTask {
}

impl ScanTask {
async fn send_entries(&self, entries: ScanEntries, apply_index: u64) {
fn send_entries(&self, entries: ScanEntries, apply_index: u64) {
let task = Task::ScanLocks {
region_id: self.region.get_id(),
observe_id: self.handle.id,
Expand Down Expand Up @@ -159,11 +159,10 @@ impl<T: 'static + CdcHandle<E>, E: KvEngine> ScannerPool<T, E> {
if has_remaining {
start_key = Some(locks.last().unwrap().0.clone())
}
task.send_entries(ScanEntries::Lock(locks), apply_index)
.await;
task.send_entries(ScanEntries::Lock(locks), apply_index);
}
RTS_SCAN_DURATION_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64());
task.send_entries(ScanEntries::None, apply_index).await;
task.send_entries(ScanEntries::None, apply_index);
};
self.workers.spawn(fut);
}
Expand Down
1 change: 1 addition & 0 deletions components/resource_control/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ mod tests {
}
}

#[allow(clippy::unused_async)]
async fn empty() {}

#[test]
Expand Down
17 changes: 11 additions & 6 deletions components/tikv_util/src/yatp_pool/future_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{

use fail::fail_point;
use futures::channel::oneshot::{self, Canceled};
use futures_util::future::FutureExt;
use prometheus::{IntCounter, IntGauge};
use tracker::TrackedFuture;
use yatp::{queue::Extras, task::future};
Expand Down Expand Up @@ -216,11 +217,13 @@ impl PoolInner {

metrics_running_task_count.inc();

let f = async move {
let _ = future.await;
// NB: Prefer FutureExt::map to async block, because an async block
// doubles memory usage.
// See /~https://github.com/rust-lang/rust/issues/59087
let f = future.map(move |_| {
metrics_handled_task_count.inc();
metrics_running_task_count.dec();
};
});

if let Some(extras) = extras {
self.pool.spawn(future::TaskCell::new(f, extras));
Expand All @@ -246,12 +249,14 @@ impl PoolInner {

let (tx, rx) = oneshot::channel();
metrics_running_task_count.inc();
self.pool.spawn(async move {
let res = future.await;
// NB: Prefer FutureExt::map to async block, because an async block
// doubles memory usage.
// See /~https://github.com/rust-lang/rust/issues/59087
self.pool.spawn(future.map(move |res| {
metrics_handled_task_count.inc();
metrics_running_task_count.dec();
let _ = tx.send(res);
});
}));
Ok(rx)
}
}
Expand Down
27 changes: 24 additions & 3 deletions scripts/clippy
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ fi
# - `derive_partial_eq_without_eq` has compilation overhead.
# - Blocking issue for enabling `result_large_err` is the protobuf messages.
# - Blocking issue for clippy::large_enum_variant is the raftstore peer message.
# - Enables `clippy::needless_return_with_question_mark` after
# /~https://github.com/rust-lang/rust-clippy/issues/11982 is fixed.
CLIPPY_LINTS=(
-A clippy::module_inception \
-A clippy::result_large_err \
Expand Down Expand Up @@ -50,9 +48,32 @@ CLIPPY_LINTS=(
-D clippy::disallowed_methods \
-D rust-2018-idioms \
-D clippy::assertions_on_result_states \
-A clippy::needless_return_with_question_mark \
-A clippy::non_canonical_partial_ord_impl \
-A clippy::arc_with_non_send_sync \
)

# TODO: Enables `clippy::needless_return_with_question_mark` after
# /~https://github.com/rust-lang/rust-clippy/issues/11982 is fixed.
CLIPPY_LINTS+=(
-A clippy::needless_return_with_question_mark \
)

# We should be pedantic about writing async code, as it's easy to write
# suboptimal or even bloat code. See:
# - /~https://github.com/rust-lang/rust/issues/69826
# - /~https://github.com/rust-lang/rust/issues/69663
# - /~https://github.com/rust-lang/rust/issues/71407
CLIPPY_LINTS+=(
-D clippy::redundant_async_block \
-D clippy::unused_async \
-D clippy::manual_async_fn \
-D clippy::large_futures \
)

# Allow let_underscore_future temporary due to lots of counterexamples in
# tests.
# TODO: deny it.
CLIPPY_LINTS+=(
-A clippy::let_underscore_future \
)

Expand Down
20 changes: 10 additions & 10 deletions src/read_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use std::{
};

use file_system::{set_io_type, IoType};
use futures::{channel::oneshot, future::TryFutureExt};
use futures::{
channel::oneshot,
future::{FutureExt, TryFutureExt},
};
use kvproto::{errorpb, kvrpcpb::CommandPri};
use online_config::{ConfigChange, ConfigManager, ConfigValue, Result as CfgResult};
use prometheus::{core::Metric, Histogram, IntCounter, IntGauge};
Expand Down Expand Up @@ -172,10 +175,9 @@ impl ReadPoolHandle {
TaskCell::new(
TrackedFuture::new(with_resource_limiter(
ControlledFuture::new(
async move {
f.await;
f.map(move |_| {
running_tasks.dec();
},
}),
resource_ctl.clone(),
group_name,
),
Expand All @@ -185,10 +187,9 @@ impl ReadPoolHandle {
)
} else {
TaskCell::new(
TrackedFuture::new(async move {
f.await;
TrackedFuture::new(f.map(move |_| {
running_tasks.dec();
}),
})),
extras,
)
};
Expand All @@ -212,10 +213,9 @@ impl ReadPoolHandle {
{
let (tx, rx) = oneshot::channel::<T>();
let res = self.spawn(
async move {
let res = f.await;
f.map(move |res| {
let _ = tx.send(res);
},
}),
priority,
task_id,
metadata,
Expand Down
Loading

0 comments on commit 55b3a52

Please sign in to comment.