From 55b3a5259e66a197d4512999c9ccab3e7a373fbe Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 21 Feb 2024 17:22:57 +0800 Subject: [PATCH] *: remove unnecessary async blocks to save memory (#16541) close tikv/tikv#16540 *: enable linters about async and futures We should be pedantic about writing async code, as it's easy to write suboptimal or even bloat code. See: /~https://github.com/rust-lang/rust/issues/69826 *: remove unnecessary async blocks to save memory This commit favors FutureExt::map over async blocks to mitigate the issue of async block doubled memory usage. Through the sysbench oltp_read_only test, it was observed that this adjustment resulted in approximately 26% reduction in memory usage. See: /~https://github.com/rust-lang/rust/issues/59087 Signed-off-by: Neil Shen Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Signed-off-by: dbsid --- .../backup-stream/src/checkpoint_manager.rs | 1 + components/backup-stream/src/router.rs | 6 ++-- .../backup-stream/src/subscription_manager.rs | 14 ++++---- components/backup/src/softlimit.rs | 12 +++---- components/resolved_ts/src/scanner.rs | 7 ++-- components/resource_control/src/future.rs | 1 + .../tikv_util/src/yatp_pool/future_pool.rs | 17 +++++---- scripts/clippy | 27 ++++++++++++-- src/read_pool.rs | 20 +++++------ src/server/status_server/mod.rs | 35 +++++++++---------- src/storage/txn/sched_pool.rs | 8 +---- 11 files changed, 84 insertions(+), 64 deletions(-) diff --git a/components/backup-stream/src/checkpoint_manager.rs b/components/backup-stream/src/checkpoint_manager.rs index e511b104c23..b5af66ab4fe 100644 --- a/components/backup-stream/src/checkpoint_manager.rs +++ b/components/backup-stream/src/checkpoint_manager.rs @@ -613,6 +613,7 @@ pub mod tests { Self(Arc::new(Mutex::new(inner))) } + #[allow(clippy::unused_async)] pub async fn fail(&self, status: RpcStatus) -> crate::errors::Result<()> { panic!("failed in a case should never fail: {}", status); } diff --git a/components/backup-stream/src/router.rs b/components/backup-stream/src/router.rs index 9ad8521a1b7..492c426f3b8 100644 --- a/components/backup-stream/src/router.rs +++ b/components/backup-stream/src/router.rs @@ -940,7 +940,7 @@ impl StreamTaskInfo { #[allow(clippy::map_entry)] if !w.contains_key(&key) { let path = key.temp_file_name(); - let val = Mutex::new(DataFile::new(path, &self.temp_file_pool).await?); + let val = Mutex::new(DataFile::new(path, &self.temp_file_pool)?); w.insert(key, val); } @@ -1444,7 +1444,7 @@ impl MetadataInfo { impl DataFile { /// create and open a logfile at the path. /// Note: if a file with same name exists, would truncate it. - async fn new(local_path: impl AsRef, files: &Arc) -> Result { + fn new(local_path: impl AsRef, files: &Arc) -> Result { let sha256 = Hasher::new(MessageDigest::sha256()) .map_err(|err| Error::Other(box_err!("openssl hasher failed to init: {}", err)))?; let inner = files.open_for_write(local_path.as_ref())?; @@ -2434,7 +2434,7 @@ mod tests { let mut f = pool.open_for_write(file_path).unwrap(); f.write_all(b"test-data").await?; f.done().await?; - let mut data_file = DataFile::new(&file_path, &pool).await.unwrap(); + let mut data_file = DataFile::new(file_path, &pool).unwrap(); let info = DataFileInfo::new(); let mut meta = MetadataInfo::with_capacity(1); diff --git a/components/backup-stream/src/subscription_manager.rs b/components/backup-stream/src/subscription_manager.rs index 7641d400fec..829d18cabac 100644 --- a/components/backup-stream/src/subscription_manager.rs +++ b/components/backup-stream/src/subscription_manager.rs @@ -435,7 +435,7 @@ where let now = Instant::now(); let timedout = self.wait(Duration::from_secs(5)).await; if timedout { - warn!("waiting for initial scanning done timed out, forcing progress!"; + warn!("waiting for initial scanning done timed out, forcing progress!"; "take" => ?now.saturating_elapsed(), "timedout" => %timedout); } let regions = resolver.resolve(self.subs.current_regions(), min_ts).await; @@ -453,7 +453,7 @@ where callback(ResolvedRegions::new(rts, cps)); } ObserveOp::HighMemUsageWarning { region_id } => { - self.on_high_memory_usage(region_id).await; + self.on_high_memory_usage(region_id); } } } @@ -507,7 +507,7 @@ where } } - async fn on_high_memory_usage(&mut self, inconsistent_region_id: u64) { + fn on_high_memory_usage(&mut self, inconsistent_region_id: u64) { let mut lame_region = Region::new(); lame_region.set_id(inconsistent_region_id); let mut act_region = None; @@ -517,9 +517,9 @@ where }); let delay = OOM_BACKOFF_BASE + Duration::from_secs(rand::thread_rng().gen_range(0..OOM_BACKOFF_JITTER_SECS)); - info!("log backup triggering high memory usage."; - "region" => %inconsistent_region_id, - "mem_usage" => %self.memory_manager.used_ratio(), + info!("log backup triggering high memory usage."; + "region" => %inconsistent_region_id, + "mem_usage" => %self.memory_manager.used_ratio(), "mem_max" => %self.memory_manager.capacity()); if let Some(region) = act_region { self.schedule_start_observe(delay, region, None); @@ -786,7 +786,7 @@ where let feedback_channel = match self.messenger.upgrade() { Some(ch) => ch, None => { - warn!("log backup subscription manager is shutting down, aborting new scan."; + warn!("log backup subscription manager is shutting down, aborting new scan."; utils::slog_region(region), "handle" => ?handle.id); return; } diff --git a/components/backup/src/softlimit.rs b/components/backup/src/softlimit.rs index c3a2fc7c796..6afd1f5b2a6 100644 --- a/components/backup/src/softlimit.rs +++ b/components/backup/src/softlimit.rs @@ -38,7 +38,7 @@ impl SoftLimit { Ok(()) } - async fn grant_tokens(&self, n: usize) { + fn grant_tokens(&self, n: usize) { self.0.semaphore.add_permits(n); } @@ -53,9 +53,9 @@ impl SoftLimit { /// Grows the tasks can be executed concurrently by n #[cfg(test)] - pub async fn grow(&self, n: usize) { + pub fn grow(&self, n: usize) { self.0.cap.fetch_add(n, Ordering::SeqCst); - self.grant_tokens(n).await; + self.grant_tokens(n); } /// resize the tasks available concurrently. @@ -66,7 +66,7 @@ impl SoftLimit { self.take_tokens(current - target).await?; } CmpOrder::Less => { - self.grant_tokens(target - current).await; + self.grant_tokens(target - current); } _ => {} } @@ -304,7 +304,7 @@ mod softlimit_test { ) .await; - limit_cloned.grow(1).await; + limit_cloned.grow(1); let working_cloned = working.clone(); should_satisfy_in( Duration::from_secs(10), @@ -314,7 +314,7 @@ mod softlimit_test { .await; let working_cloned = working.clone(); - limit_cloned.grow(2).await; + limit_cloned.grow(2); should_satisfy_in( Duration::from_secs(10), "waiting for worker grow to 4", diff --git a/components/resolved_ts/src/scanner.rs b/components/resolved_ts/src/scanner.rs index c0715b42ff1..1a98ac001dd 100644 --- a/components/resolved_ts/src/scanner.rs +++ b/components/resolved_ts/src/scanner.rs @@ -43,7 +43,7 @@ pub struct ScanTask { } impl ScanTask { - async fn send_entries(&self, entries: ScanEntries, apply_index: u64) { + fn send_entries(&self, entries: ScanEntries, apply_index: u64) { let task = Task::ScanLocks { region_id: self.region.get_id(), observe_id: self.handle.id, @@ -159,11 +159,10 @@ impl, E: KvEngine> ScannerPool { if has_remaining { start_key = Some(locks.last().unwrap().0.clone()) } - task.send_entries(ScanEntries::Lock(locks), apply_index) - .await; + task.send_entries(ScanEntries::Lock(locks), apply_index); } RTS_SCAN_DURATION_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64()); - task.send_entries(ScanEntries::None, apply_index).await; + task.send_entries(ScanEntries::None, apply_index); }; self.workers.spawn(fut); } diff --git a/components/resource_control/src/future.rs b/components/resource_control/src/future.rs index 0750a21c574..345f7d88f36 100644 --- a/components/resource_control/src/future.rs +++ b/components/resource_control/src/future.rs @@ -275,6 +275,7 @@ mod tests { } } + #[allow(clippy::unused_async)] async fn empty() {} #[test] diff --git a/components/tikv_util/src/yatp_pool/future_pool.rs b/components/tikv_util/src/yatp_pool/future_pool.rs index 75d65fe4641..14ff6e12d12 100644 --- a/components/tikv_util/src/yatp_pool/future_pool.rs +++ b/components/tikv_util/src/yatp_pool/future_pool.rs @@ -13,6 +13,7 @@ use std::{ use fail::fail_point; use futures::channel::oneshot::{self, Canceled}; +use futures_util::future::FutureExt; use prometheus::{IntCounter, IntGauge}; use tracker::TrackedFuture; use yatp::{queue::Extras, task::future}; @@ -216,11 +217,13 @@ impl PoolInner { metrics_running_task_count.inc(); - let f = async move { - let _ = future.await; + // NB: Prefer FutureExt::map to async block, because an async block + // doubles memory usage. + // See /~https://github.com/rust-lang/rust/issues/59087 + let f = future.map(move |_| { metrics_handled_task_count.inc(); metrics_running_task_count.dec(); - }; + }); if let Some(extras) = extras { self.pool.spawn(future::TaskCell::new(f, extras)); @@ -246,12 +249,14 @@ impl PoolInner { let (tx, rx) = oneshot::channel(); metrics_running_task_count.inc(); - self.pool.spawn(async move { - let res = future.await; + // NB: Prefer FutureExt::map to async block, because an async block + // doubles memory usage. + // See /~https://github.com/rust-lang/rust/issues/59087 + self.pool.spawn(future.map(move |res| { metrics_handled_task_count.inc(); metrics_running_task_count.dec(); let _ = tx.send(res); - }); + })); Ok(rx) } } diff --git a/scripts/clippy b/scripts/clippy index 23ed1a2cd9c..3701099dafa 100755 --- a/scripts/clippy +++ b/scripts/clippy @@ -21,8 +21,6 @@ fi # - `derive_partial_eq_without_eq` has compilation overhead. # - Blocking issue for enabling `result_large_err` is the protobuf messages. # - Blocking issue for clippy::large_enum_variant is the raftstore peer message. -# - Enables `clippy::needless_return_with_question_mark` after -# /~https://github.com/rust-lang/rust-clippy/issues/11982 is fixed. CLIPPY_LINTS=( -A clippy::module_inception \ -A clippy::result_large_err \ @@ -50,9 +48,32 @@ CLIPPY_LINTS=( -D clippy::disallowed_methods \ -D rust-2018-idioms \ -D clippy::assertions_on_result_states \ - -A clippy::needless_return_with_question_mark \ -A clippy::non_canonical_partial_ord_impl \ -A clippy::arc_with_non_send_sync \ +) + +# TODO: Enables `clippy::needless_return_with_question_mark` after +# /~https://github.com/rust-lang/rust-clippy/issues/11982 is fixed. +CLIPPY_LINTS+=( + -A clippy::needless_return_with_question_mark \ +) + +# We should be pedantic about writing async code, as it's easy to write +# suboptimal or even bloat code. See: +# - /~https://github.com/rust-lang/rust/issues/69826 +# - /~https://github.com/rust-lang/rust/issues/69663 +# - /~https://github.com/rust-lang/rust/issues/71407 +CLIPPY_LINTS+=( + -D clippy::redundant_async_block \ + -D clippy::unused_async \ + -D clippy::manual_async_fn \ + -D clippy::large_futures \ +) + +# Allow let_underscore_future temporary due to lots of counterexamples in +# tests. +# TODO: deny it. +CLIPPY_LINTS+=( -A clippy::let_underscore_future \ ) diff --git a/src/read_pool.rs b/src/read_pool.rs index dec25a87dc0..a61c42dfba4 100644 --- a/src/read_pool.rs +++ b/src/read_pool.rs @@ -12,7 +12,10 @@ use std::{ }; use file_system::{set_io_type, IoType}; -use futures::{channel::oneshot, future::TryFutureExt}; +use futures::{ + channel::oneshot, + future::{FutureExt, TryFutureExt}, +}; use kvproto::{errorpb, kvrpcpb::CommandPri}; use online_config::{ConfigChange, ConfigManager, ConfigValue, Result as CfgResult}; use prometheus::{core::Metric, Histogram, IntCounter, IntGauge}; @@ -172,10 +175,9 @@ impl ReadPoolHandle { TaskCell::new( TrackedFuture::new(with_resource_limiter( ControlledFuture::new( - async move { - f.await; + f.map(move |_| { running_tasks.dec(); - }, + }), resource_ctl.clone(), group_name, ), @@ -185,10 +187,9 @@ impl ReadPoolHandle { ) } else { TaskCell::new( - TrackedFuture::new(async move { - f.await; + TrackedFuture::new(f.map(move |_| { running_tasks.dec(); - }), + })), extras, ) }; @@ -212,10 +213,9 @@ impl ReadPoolHandle { { let (tx, rx) = oneshot::channel::(); let res = self.spawn( - async move { - let res = f.await; + f.map(move |res| { let _ = tx.send(res); - }, + }), priority, task_id, metadata, diff --git a/src/server/status_server/mod.rs b/src/server/status_server/mod.rs index 862b2b19c72..e87a214f2a6 100644 --- a/src/server/status_server/mod.rs +++ b/src/server/status_server/mod.rs @@ -133,7 +133,7 @@ where }) } - async fn dump_heap_prof_to_resp(req: Request) -> hyper::Result> { + fn dump_heap_prof_to_resp(req: Request) -> hyper::Result> { let query = req.uri().query().unwrap_or(""); let query_pairs: HashMap<_, _> = url::form_urlencoded::parse(query.as_bytes()).collect(); @@ -173,7 +173,7 @@ where } } - async fn get_config( + fn get_config( req: Request, cfg_controller: &ConfigController, ) -> hyper::Result> { @@ -205,7 +205,7 @@ where }) } - async fn get_cmdline(_req: Request) -> hyper::Result> { + fn get_cmdline(_req: Request) -> hyper::Result> { let args = args().fold(String::new(), |mut a, b| { a.push_str(&b); a.push('\x00'); @@ -219,7 +219,7 @@ where Ok(response) } - async fn get_symbol_count(req: Request) -> hyper::Result> { + fn get_symbol_count(req: Request) -> hyper::Result> { assert_eq!(req.method(), Method::GET); // We don't know how many symbols we have, but we // do have symbol information. pprof only cares whether @@ -340,7 +340,7 @@ where }) } - async fn update_config_from_toml_file( + fn update_config_from_toml_file( cfg_controller: ConfigController, _req: Request, ) -> hyper::Result> { @@ -432,7 +432,7 @@ where } } - async fn get_engine_type(cfg_controller: &ConfigController) -> hyper::Result> { + fn get_engine_type(cfg_controller: &ConfigController) -> hyper::Result> { let engine_type = cfg_controller.get_engine_type(); let response = Response::builder() .header("Content-Type", mime::TEXT_PLAIN.to_string()) @@ -459,7 +459,7 @@ impl StatusServer where R: 'static + Send + RaftExtension + Clone, { - async fn dump_async_trace() -> hyper::Result> { + fn dump_async_trace() -> hyper::Result> { Ok(make_response( StatusCode::OK, tracing_active_tree::layer::global().fmt_bytes_with(|t, buf| { @@ -470,7 +470,7 @@ where )) } - async fn handle_pause_grpc( + fn handle_pause_grpc( mut grpc_service_mgr: GrpcServiceManager, ) -> hyper::Result> { if let Err(err) = grpc_service_mgr.pause() { @@ -485,7 +485,7 @@ where )) } - async fn handle_resume_grpc( + fn handle_resume_grpc( mut grpc_service_mgr: GrpcServiceManager, ) -> hyper::Result> { if let Err(err) = grpc_service_mgr.resume() { @@ -686,21 +686,21 @@ where )) } (Method::GET, "/debug/pprof/heap") => { - Self::dump_heap_prof_to_resp(req).await + Self::dump_heap_prof_to_resp(req) } - (Method::GET, "/debug/pprof/cmdline") => Self::get_cmdline(req).await, + (Method::GET, "/debug/pprof/cmdline") => Self::get_cmdline(req), (Method::GET, "/debug/pprof/symbol") => { - Self::get_symbol_count(req).await + Self::get_symbol_count(req) } (Method::POST, "/debug/pprof/symbol") => Self::get_symbol(req).await, (Method::GET, "/config") => { - Self::get_config(req, &cfg_controller).await + Self::get_config(req, &cfg_controller) } (Method::POST, "/config") => { Self::update_config(cfg_controller.clone(), req).await } (Method::GET, "/engine_type") => { - Self::get_engine_type(&cfg_controller).await + Self::get_engine_type(&cfg_controller) } // This interface is used for configuration file hosting scenarios, // TiKV will not update configuration files, and this interface will @@ -708,7 +708,6 @@ where // hand it over to the hosting platform for processing. (Method::PUT, "/config/reload") => { Self::update_config_from_toml_file(cfg_controller.clone(), req) - .await } (Method::GET, "/debug/pprof/profile") => { Self::dump_cpu_prof_to_resp(req).await @@ -729,12 +728,12 @@ where Self::handle_get_all_resource_groups(resource_manager.as_ref()) } (Method::PUT, "/pause_grpc") => { - Self::handle_pause_grpc(grpc_service_mgr).await + Self::handle_pause_grpc(grpc_service_mgr) } (Method::PUT, "/resume_grpc") => { - Self::handle_resume_grpc(grpc_service_mgr).await + Self::handle_resume_grpc(grpc_service_mgr) } - (Method::GET, "/async_tasks") => Self::dump_async_trace().await, + (Method::GET, "/async_tasks") => Self::dump_async_trace(), _ => { is_unknown_path = true; Ok(make_response(StatusCode::NOT_FOUND, "path not found")) diff --git a/src/storage/txn/sched_pool.rs b/src/storage/txn/sched_pool.rs index 3ba486a6496..0d153288624 100644 --- a/src/storage/txn/sched_pool.rs +++ b/src/storage/txn/sched_pool.rs @@ -130,13 +130,7 @@ impl PriorityQueue { extras.set_metadata(metadata.to_vec()); self.worker_pool.spawn_with_extras( with_resource_limiter( - ControlledFuture::new( - async move { - f.await; - }, - self.resource_ctl.clone(), - group_name, - ), + ControlledFuture::new(f, self.resource_ctl.clone(), group_name), resource_limiter, ), extras,