From db9506d0b6adbbf755f8029b3a2a521042df0b39 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Mon, 20 Mar 2023 16:54:18 +0800 Subject: [PATCH] nydus-image: enable multi-threading when exporting block images Enable multi-threading when exporting block images, to reduce exporting time. Signed-off-by: Jiang Liu --- service/src/block_device.rs | 81 ++++++++++++++++++++++++++++++++++--- 1 file changed, 75 insertions(+), 6 deletions(-) diff --git a/service/src/block_device.rs b/service/src/block_device.rs index 8623a07e0b2..259f61a232c 100644 --- a/service/src/block_device.rs +++ b/service/src/block_device.rs @@ -11,11 +11,13 @@ //! Based on the block address scheme, an RAFSv6 image can be converted into/represented as a block //! device, so it can be directly mounted by Linux EROFS fs driver. -use std::cmp::min; +use std::cmp::{max, min}; use std::fs::OpenOptions; use std::io::Result; use std::path::PathBuf; use std::sync::Arc; +use std::thread; +use std::thread::JoinHandle; use dbs_allocator::{Constraint, IntervalTree, NodeState, Range}; use nydus_api::BlobCacheEntry; @@ -284,7 +286,7 @@ impl BlockDevice { blob_entry: BlobCacheEntry, output: Option, localfs_dir: Option, - _threads: u32, + threads: u32, ) -> Result<()> { let cache_mgr = Arc::new(BlobCacheMgr::new()); cache_mgr.add_blob_entry(&blob_entry).map_err(|e| { @@ -305,7 +307,7 @@ impl BlockDevice { let path = match output { Some(v) => PathBuf::from(v), None => { - let path = match block_device.cache_mgr.get_config(&blob_id) { + let path = match cache_mgr.get_config(&blob_id) { Some(BlobConfig::MetaBlob(meta)) => meta.path().to_path_buf(), _ => return Err(enoent!("block_device: failed to get meta blob")), }; @@ -351,10 +353,77 @@ impl BlockDevice { })?; let output_file = Arc::new(tokio_uring::fs::File::from_std(output_file)); - tokio_uring::start(async move { - Self::do_export(block_device.clone(), output_file, 0, block_device.blocks()).await - })?; + let blocks = block_device.blocks(); + let batch_size = BLOCK_DEVICE_EXPORT_BATCH_SIZE as u32 / block_device.block_size() as u32; + assert_eq!(batch_size.count_ones(), 1); + let threads = max(threads, 1); + let mut threads = min(threads, 32); + while blocks / threads < batch_size && threads > 1 { + threads = threads / 2; + } + if threads == 1 { + tokio_uring::start(async move { + Self::do_export(block_device.clone(), output_file, 0, block_device.blocks()).await + })?; + } else { + let mut thread_handlers: Vec>> = + Vec::with_capacity(threads as usize); + let step = (blocks + batch_size - 1) & !(batch_size - 1); + let mut pos = 0; + + for _i in 0..threads { + let count = min(blocks - pos, step); + let mgr = cache_mgr.clone(); + let id = blob_id.clone(); + let path = path.to_path_buf(); + + let handler = thread::spawn(move || { + let output_file = OpenOptions::new() + .read(true) + .write(true) + .open(&path) + .map_err(|e| { + eother!(format!( + "block_device: failed to create output file {}, {}", + path.display(), + e + )) + })?; + let file = Arc::new(tokio_uring::fs::File::from_std(output_file)); + let block_device = BlockDevice::new(id, mgr).map_err(|e| { + eother!(format!( + "block_device: failed to create block device object, {}", + e + )) + })?; + let device = Arc::new(block_device); + + tokio_uring::start( + async move { Self::do_export(device, file, pos, count).await }, + )?; + Ok(()) + }); + pos += count; + thread_handlers.push(handler); + } + assert_eq!(pos, blocks); + assert_eq!(thread_handlers.len(), threads as usize); + + for handler in thread_handlers { + handler + .join() + .map_err(|e| { + eother!(format!( + "block_device: failed to wait for worker thread, {:?}", + e + )) + })? + .map_err(|e| { + eother!(format!("block_device: failed to export disk image, {}", e)) + })?; + } + } Ok(()) }