Skip to content

Commit

Permalink
nydus-image: enable multi-threading when exporting block images
Browse files Browse the repository at this point in the history
Enable multi-threading when exporting block images, to reduce exporting
time.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
  • Loading branch information
jiangliu committed Mar 20, 2023
1 parent 7c2861a commit db9506d
Showing 1 changed file with 75 additions and 6 deletions.
81 changes: 75 additions & 6 deletions service/src/block_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
//! Based on the block address scheme, an RAFSv6 image can be converted into/represented as a block
//! device, so it can be directly mounted by Linux EROFS fs driver.
use std::cmp::min;
use std::cmp::{max, min};
use std::fs::OpenOptions;
use std::io::Result;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;

use dbs_allocator::{Constraint, IntervalTree, NodeState, Range};
use nydus_api::BlobCacheEntry;
Expand Down Expand Up @@ -284,7 +286,7 @@ impl BlockDevice {
blob_entry: BlobCacheEntry,
output: Option<String>,
localfs_dir: Option<String>,
_threads: u32,
threads: u32,
) -> Result<()> {
let cache_mgr = Arc::new(BlobCacheMgr::new());
cache_mgr.add_blob_entry(&blob_entry).map_err(|e| {
Expand All @@ -305,7 +307,7 @@ impl BlockDevice {
let path = match output {
Some(v) => PathBuf::from(v),
None => {
let path = match block_device.cache_mgr.get_config(&blob_id) {
let path = match cache_mgr.get_config(&blob_id) {
Some(BlobConfig::MetaBlob(meta)) => meta.path().to_path_buf(),
_ => return Err(enoent!("block_device: failed to get meta blob")),
};
Expand Down Expand Up @@ -351,10 +353,77 @@ impl BlockDevice {
})?;
let output_file = Arc::new(tokio_uring::fs::File::from_std(output_file));

tokio_uring::start(async move {
Self::do_export(block_device.clone(), output_file, 0, block_device.blocks()).await
})?;
let blocks = block_device.blocks();
let batch_size = BLOCK_DEVICE_EXPORT_BATCH_SIZE as u32 / block_device.block_size() as u32;
assert_eq!(batch_size.count_ones(), 1);
let threads = max(threads, 1);
let mut threads = min(threads, 32);
while blocks / threads < batch_size && threads > 1 {
threads = threads / 2;
}

if threads == 1 {
tokio_uring::start(async move {
Self::do_export(block_device.clone(), output_file, 0, block_device.blocks()).await
})?;
} else {
let mut thread_handlers: Vec<JoinHandle<Result<()>>> =
Vec::with_capacity(threads as usize);
let step = (blocks + batch_size - 1) & !(batch_size - 1);
let mut pos = 0;

for _i in 0..threads {
let count = min(blocks - pos, step);
let mgr = cache_mgr.clone();
let id = blob_id.clone();
let path = path.to_path_buf();

let handler = thread::spawn(move || {
let output_file = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.map_err(|e| {
eother!(format!(
"block_device: failed to create output file {}, {}",
path.display(),
e
))
})?;
let file = Arc::new(tokio_uring::fs::File::from_std(output_file));
let block_device = BlockDevice::new(id, mgr).map_err(|e| {
eother!(format!(
"block_device: failed to create block device object, {}",
e
))
})?;
let device = Arc::new(block_device);

tokio_uring::start(
async move { Self::do_export(device, file, pos, count).await },
)?;
Ok(())
});
pos += count;
thread_handlers.push(handler);
}
assert_eq!(pos, blocks);
assert_eq!(thread_handlers.len(), threads as usize);

for handler in thread_handlers {
handler
.join()
.map_err(|e| {
eother!(format!(
"block_device: failed to wait for worker thread, {:?}",
e
))
})?
.map_err(|e| {
eother!(format!("block_device: failed to export disk image, {}", e))
})?;
}
}
Ok(())
}

Expand Down

0 comments on commit db9506d

Please sign in to comment.