From 0917afb411ef4cfc33ee6bc90fdfbf96a8408ca6 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Mon, 20 Mar 2023 11:06:17 +0800 Subject: [PATCH 1/3] nydus-image: syntax changes for commandline option preparation Syntax only changes for commandline option preparation. Signed-off-by: Jiang Liu --- src/bin/nydus-image/main.rs | 375 ++++++++++++++++++------------------ 1 file changed, 190 insertions(+), 185 deletions(-) diff --git a/src/bin/nydus-image/main.rs b/src/bin/nydus-image/main.rs index 3ded8f05985..663de14e1b8 100644 --- a/src/bin/nydus-image/main.rs +++ b/src/bin/nydus-image/main.rs @@ -153,11 +153,30 @@ fn prepare_cmd_args(bti_string: &'static str) -> App { .help("Configuration file for storage backend, cache and RAFS FUSE filesystem.") .required(false); - App::new("") + let app = App::new("") .version(bti_string) .author(crate_authors!()) .about("Build, analyze, inspect or validate RAFS filesystems/Nydus accelerated container images") - .subcommand( + .arg( + Arg::new("log-file") + .long("log-file") + .short('L') + .help("Log file path") + .required(false) + .global(true), + ) + .arg( + Arg::new("log-level") + .long("log-level") + .short('l') + .help("Log level:") + .default_value("info") + .value_parser(["trace", "debug", "info", "warn", "error"]) + .required(false) + .global(true), + ); + + let app = app.subcommand( App::new("create") .about("Create RAFS filesystems from directories, tar files or OCI images") .arg( @@ -314,138 +333,138 @@ fn prepare_cmd_args(bti_string: &'static str) -> App { .arg( arg_output_json.clone(), ) - ) - .subcommand( - App::new("merge") - .about("Merge multiple bootstraps into a overlaid bootstrap") - .arg( - Arg::new("parent-bootstrap") - .long("parent-bootstrap") - .help("File path of the parent/referenced RAFS metadata blob (optional)") - .required(false), - ) - .arg( - Arg::new("bootstrap") - .long("bootstrap") - .short('B') - .help("Output path of nydus overlaid bootstrap"), - ) - .arg( - Arg::new("blob-dir") - .long("blob-dir") - .short('D') - .help("Directory path to save generated RAFS metadata and data blobs"), - ) - .arg( - arg_chunk_dict.clone(), - ) - .arg( - arg_prefetch_policy, - ) - .arg( - arg_output_json.clone(), - ) - .arg( - Arg::new("blob-digests") + ); + + let app = app.subcommand( + App::new("merge") + .about("Merge multiple bootstraps into a overlaid bootstrap") + .arg( + Arg::new("parent-bootstrap") + .long("parent-bootstrap") + .help("File path of the parent/referenced RAFS metadata blob (optional)") + .required(false), + ) + .arg( + Arg::new("bootstrap") + .long("bootstrap") + .short('B') + .help("Output path of nydus overlaid bootstrap"), + ) + .arg( + Arg::new("blob-dir") + .long("blob-dir") + .short('D') + .help("Directory path to save generated RAFS metadata and data blobs"), + ) + .arg(arg_chunk_dict.clone()) + .arg(arg_prefetch_policy) + .arg(arg_output_json.clone()) + .arg( + Arg::new("blob-digests") .long("blob-digests") - .required(false) - .help("RAFS blob digest list separated by comma"), - ) - .arg( - Arg::new("blob-sizes") + .required(false) + .help("RAFS blob digest list separated by comma"), + ) + .arg( + Arg::new("blob-sizes") .long("blob-sizes") - .required(false) - .help("RAFS blob size list separated by comma"), - ) - .arg( - Arg::new("blob-toc-digests") - .long("blob-toc-digests") - .required(false) - .help("RAFS blob toc digest list separated by comma"), - ) - .arg( - Arg::new("blob-toc-sizes") - .long("blob-toc-sizes") - .required(false) - .help("RAFS blob toc size list separated by comma"), - ) - .arg(arg_config.clone()) - .arg( - Arg::new("SOURCE") - .help("bootstrap paths (allow one or more)") - .required(true) - .num_args(1..), - ) - ) - .subcommand( - App::new("check") - .about("Validate RAFS filesystem metadata") - .arg( - Arg::new("BOOTSTRAP") - .help("File path of RAFS metadata") - .required_unless_present("bootstrap"), - ) - .arg( - Arg::new("bootstrap") - .short('B') - .long("bootstrap") - .help("[Deprecated] File path of RAFS meta blob/bootstrap") - .conflicts_with("BOOTSTRAP") - .required(false), - ) - .arg( - Arg::new("blob-dir") - .long("blob-dir") - .short('D') - .conflicts_with("config") - .help("Directory for localfs storage backend, hosting data blobs and cache files"), - ) - .arg(arg_config.clone()) - .arg( - Arg::new("verbose") - .long("verbose") - .short('v') - .help("Output message in verbose mode") - .action(ArgAction::SetTrue) - .required(false), - ) - .arg( - arg_output_json.clone(), - ) - ) - .subcommand( - App::new("inspect") - .about("Inspect RAFS filesystem metadata in interactive or request mode") - .arg( - Arg::new("BOOTSTRAP") - .help("File path of RAFS metadata") - .required_unless_present("bootstrap"), - ) - .arg( - Arg::new("bootstrap") - .short('B') - .long("bootstrap") - .help("[Deprecated] File path of RAFS meta blob/bootstrap") - .conflicts_with("BOOTSTRAP") - .required(false), - ) - .arg( - Arg::new("blob-dir") - .long("blob-dir") - .short('D') - .conflicts_with("config") - .help("Directory for localfs storage backend, hosting data blobs and cache files"), - ) - .arg(arg_config.clone()) - .arg( - Arg::new("request") - .long("request") - .short('R') - .help("Inspect RAFS filesystem metadata in request mode") - .required(false), - ) - ) - .subcommand( + .required(false) + .help("RAFS blob size list separated by comma"), + ) + .arg( + Arg::new("blob-toc-digests") + .long("blob-toc-digests") + .required(false) + .help("RAFS blob toc digest list separated by comma"), + ) + .arg( + Arg::new("blob-toc-sizes") + .long("blob-toc-sizes") + .required(false) + .help("RAFS blob toc size list separated by comma"), + ) + .arg(arg_config.clone()) + .arg( + Arg::new("SOURCE") + .help("bootstrap paths (allow one or more)") + .required(true) + .num_args(1..), + ), + ); + + let app = app.subcommand( + App::new("check") + .about("Validate RAFS filesystem metadata") + .arg( + Arg::new("BOOTSTRAP") + .help("File path of RAFS metadata") + .required_unless_present("bootstrap"), + ) + .arg( + Arg::new("bootstrap") + .short('B') + .long("bootstrap") + .help("[Deprecated] File path of RAFS meta blob/bootstrap") + .conflicts_with("BOOTSTRAP") + .required(false), + ) + .arg( + Arg::new("blob-dir") + .long("blob-dir") + .short('D') + .conflicts_with("config") + .help( + "Directory for localfs storage backend, hosting data blobs and cache files", + ), + ) + .arg(arg_config.clone()) + .arg( + Arg::new("verbose") + .long("verbose") + .short('v') + .help("Output message in verbose mode") + .action(ArgAction::SetTrue) + .required(false), + ) + .arg(arg_output_json.clone()), + ); + + let app = app.subcommand( + App::new("inspect") + .about("Inspect RAFS filesystem metadata in interactive or request mode") + .arg( + Arg::new("BOOTSTRAP") + .help("File path of RAFS metadata") + .required_unless_present("bootstrap"), + ) + .arg( + Arg::new("bootstrap") + .short('B') + .long("bootstrap") + .help("[Deprecated] File path of RAFS meta blob/bootstrap") + .conflicts_with("BOOTSTRAP") + .required(false), + ) + .arg( + Arg::new("blob-dir") + .long("blob-dir") + .short('D') + .conflicts_with("config") + .help( + "Directory for localfs storage backend, hosting data blobs and cache files", + ), + ) + .arg(arg_config.clone()) + .arg( + Arg::new("request") + .long("request") + .short('R') + .help("Inspect RAFS filesystem metadata in request mode") + .required(false), + ), + ); + + let app = app.subcommand( App::new("stat") .about("Generate statistics information for RAFS filesystems") .arg( @@ -481,8 +500,9 @@ fn prepare_cmd_args(bti_string: &'static str) -> App { .arg( arg_output_json.clone(), ) - ) - .subcommand( + ); + + let app = app.subcommand( App::new("compact") .about("(experimental)Compact specific nydus image, remove unused chunks in blobs, merge small blobs") .arg( @@ -515,69 +535,54 @@ fn prepare_cmd_args(bti_string: &'static str) -> App { .arg( arg_output_json, ) - ) - .subcommand( - App::new("unpack") + ); + + app.subcommand( + App::new("unpack") .about("Unpack a RAFS filesystem to a tar file") - .arg( - Arg::new("BOOTSTRAP") - .help("File path of RAFS metadata") - .required_unless_present("bootstrap"), - ) - .arg( - Arg::new("backend-config") - .long("backend-config") - .help("config file of backend") - .required(false), - ) - .arg( - Arg::new("bootstrap") - .short('B') - .long("bootstrap") - .help("[Deprecated] File path of RAFS meta blob/bootstrap") - .conflicts_with("BOOTSTRAP") - .required(false), - ) + .arg( + Arg::new("BOOTSTRAP") + .help("File path of RAFS metadata") + .required_unless_present("bootstrap"), + ) + .arg( + Arg::new("backend-config") + .long("backend-config") + .help("config file of backend") + .required(false), + ) + .arg( + Arg::new("bootstrap") + .short('B') + .long("bootstrap") + .help("[Deprecated] File path of RAFS meta blob/bootstrap") + .conflicts_with("BOOTSTRAP") + .required(false), + ) .arg( Arg::new("blob") - .long("blob") - .short('b') - .help("path to RAFS data blob file") - .required(false), - ) + .long("blob") + .short('b') + .help("path to RAFS data blob file") + .required(false), + ) .arg( Arg::new("blob-dir") .long("blob-dir") .short('D') .conflicts_with("config") - .help("Directory for localfs storage backend, hosting data blobs and cache files"), + .help( + "Directory for localfs storage backend, hosting data blobs and cache files", + ), ) .arg(arg_config) .arg( Arg::new("output") - .long("output") - .help("path for output tar file") - .required(true), - ) - ) - .arg( - Arg::new("log-file") - .long("log-file") - .short('L') - .help("Log file path") - .required(false) - .global(true), - ) - .arg( - Arg::new("log-level") - .long("log-level") - .short('l') - .help("Log level:") - .default_value("info") - .value_parser(["trace", "debug", "info", "warn", "error"]) - .required(false) - .global(true), - ) + .long("output") + .help("path for output tar file") + .required(true), + ), + ) } fn init_log(matches: &ArgMatches) -> Result<()> { From d5ef1412194f61b6288a348e8ef31a02959a2f86 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Mon, 20 Mar 2023 10:50:04 +0800 Subject: [PATCH 2/3] nydus-image: introduce new subcommand export Introduce new subcommand `export` to nydus-image, which will be used to export RAFS filesystems as raw block device images or tar files. Signed-off-by: Jiang Liu --- Cargo.toml | 2 +- service/src/block_device.rs | 133 +++++++++++++++++++++++++++++++++++- src/bin/nydus-image/main.rs | 127 +++++++++++++++++++++++++++++++++- 3 files changed, 256 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a7da30efc74..9359bb5bb46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ nydus-api = { version = "0.2.2", path = "api", features = ["handler"] } nydus-app = { version = "0.3.2", path = "app" } nydus-error = { version = "0.2.3", path = "error" } nydus-rafs = { version = "0.2.2", path = "rafs", features = ["builder"] } -nydus-service = { version = "0.2.0", path = "service" } +nydus-service = { version = "0.2.0", path = "service", features = ["block-device"] } nydus-storage = { version = "0.6.2", path = "storage" } nydus-utils = { version = "0.4.1", path = "utils" } diff --git a/service/src/block_device.rs b/service/src/block_device.rs index a751ef9cbf9..a40eb73a108 100644 --- a/service/src/block_device.rs +++ b/service/src/block_device.rs @@ -12,16 +12,22 @@ //! device, so it can be directly mounted by Linux EROFS fs driver. use std::cmp::min; +use std::fs::OpenOptions; use std::io::Result; +use std::path::PathBuf; use std::sync::Arc; use dbs_allocator::{Constraint, IntervalTree, NodeState, Range}; +use nydus_api::BlobCacheEntry; use nydus_rafs::metadata::layout::v6::{ EROFS_BLOCK_BITS_12, EROFS_BLOCK_BITS_9, EROFS_BLOCK_SIZE_4096, EROFS_BLOCK_SIZE_512, }; +use nydus_storage::utils::alloc_buf; use tokio_uring::buf::IoBufMut; -use crate::blob_cache::{BlobCacheMgr, BlobConfig, DataBlob, MetaBlob}; +use crate::blob_cache::{generate_blob_key, BlobCacheMgr, BlobConfig, DataBlob, MetaBlob}; + +const BLOCK_DEVICE_EXPORT_BATCH_SIZE: usize = 0x80000; enum BlockRange { Hole, @@ -81,7 +87,7 @@ impl BlockDevice { meta_blob_config.blob_id() )) })?; - ranges.update(&range, BlockRange::MetaBlob(meta_blob.clone())); + ranges.update(&range, BlockRange::MetaBlob(meta_blob)); let mut pos = blocks; let data_blobs = meta_blob_config.get_blobs(); @@ -272,6 +278,127 @@ impl BlockDevice { (Ok(total_size), buf) } + + /// Export a RAFS filesystem as a raw block disk image. + pub fn export( + blob_entry: BlobCacheEntry, + output: Option, + localfs_dir: Option, + _threads: u32, + ) -> Result<()> { + let cache_mgr = Arc::new(BlobCacheMgr::new()); + cache_mgr.add_blob_entry(&blob_entry).map_err(|e| { + eother!(format!( + "block_device: failed to add blob into CacheMgr, {}", + e + )) + })?; + let blob_id = generate_blob_key(&blob_entry.domain_id, &blob_entry.blob_id); + let block_device = BlockDevice::new(blob_id.clone(), cache_mgr.clone()).map_err(|e| { + eother!(format!( + "block_device: failed to create block device object, {}", + e + )) + })?; + let block_device = Arc::new(block_device); + + let path = match output { + Some(v) => PathBuf::from(v), + None => { + let path = match cache_mgr.get_config(&blob_id) { + Some(BlobConfig::MetaBlob(meta)) => meta.path().to_path_buf(), + _ => return Err(enoent!("block_device: failed to get meta blob")), + }; + if !path.is_file() { + return Err(eother!(format!( + "block_device: meta blob {} is not a file", + path.display() + ))); + } + let name = path + .file_name() + .ok_or_else(|| { + eother!(format!( + "block_device: failed to get file name from {}", + path.display() + )) + })? + .to_str() + .ok_or_else(|| { + eother!(format!( + "block_device: failed to get file name from {}", + path.display() + )) + })?; + let dir = localfs_dir + .ok_or_else(|| einval!("block_device: parameter `localfs_dir` is missing"))?; + let path = PathBuf::from(dir); + path.join(name.to_string() + ".disk") + } + }; + + let output_file = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .open(&path) + .map_err(|e| { + eother!(format!( + "block_device: failed to create output file {}, {}", + path.display(), + e + )) + })?; + let output_file = Arc::new(tokio_uring::fs::File::from_std(output_file)); + + tokio_uring::start(async move { + Self::do_export(block_device.clone(), output_file, 0, block_device.blocks()).await + })?; + + Ok(()) + } + + async fn do_export( + block_device: Arc, + output_file: Arc, + start: u32, + mut blocks: u32, + ) -> Result<()> { + let batch_size = BLOCK_DEVICE_EXPORT_BATCH_SIZE as u32 / block_device.block_size() as u32; + let mut pos = start; + let mut buf = alloc_buf(BLOCK_DEVICE_EXPORT_BATCH_SIZE); + + while blocks > 0 { + let count = min(batch_size, blocks); + let (res, buf1) = block_device.async_read(pos, count, buf).await; + let sz = res?; + if sz != count as usize * block_device.block_size() as usize { + return Err(eio!( + "block_device: failed to read data, got less data than requested" + )); + } + buf = buf1; + + if sz != buf.len() { + buf.resize(sz, 0); + } + let (res, buf2) = output_file + .write_at(buf, block_device.blocks_to_size(pos)) + .await; + let sz1 = res?; + if sz1 != sz { + return Err(eio!( + "block_device: failed to write data to disk image file, written less data than requested" + )); + } + buf = buf2; + + pos += count; + blocks -= count; + } + + Ok(()) + } } #[cfg(test)] @@ -337,7 +464,7 @@ mod tests { assert!(mgr.get_config(&key).is_some()); let mgr = Arc::new(mgr); - let device = BlockDevice::new(blob_id.clone(), mgr).unwrap(); + let device = BlockDevice::new(blob_id, mgr).unwrap(); assert_eq!(device.blocks(), 0x209); tokio_uring::start(async move { diff --git a/src/bin/nydus-image/main.rs b/src/bin/nydus-image/main.rs index 663de14e1b8..d19c1d86900 100644 --- a/src/bin/nydus-image/main.rs +++ b/src/bin/nydus-image/main.rs @@ -18,14 +18,15 @@ use std::convert::TryFrom; use std::fs::{self, metadata, DirEntry, File, OpenOptions}; use std::os::unix::fs::FileTypeExt; use std::path::{Path, PathBuf}; +use std::str::FromStr; use std::sync::Arc; use anyhow::{bail, Context, Result}; use clap::parser::ValueSource; use clap::{Arg, ArgAction, ArgMatches, Command as App}; use nix::unistd::{getegid, geteuid}; -use nydus::get_build_time_info; -use nydus_api::{BuildTimeInfo, ConfigV2, LocalFsConfig}; +use nydus::{get_build_time_info, SubCmdArgs}; +use nydus_api::{BlobCacheEntry, BuildTimeInfo, ConfigV2, LocalFsConfig}; use nydus_app::setup_logging; use nydus_rafs::builder::{ parse_chunk_dict_arg, ArtifactStorage, BlobCompactor, BlobManager, BootstrapManager, @@ -33,6 +34,8 @@ use nydus_rafs::builder::{ HashChunkDict, Merger, Prefetch, PrefetchPolicy, StargzBuilder, TarballBuilder, WhiteoutSpec, }; use nydus_rafs::metadata::{RafsSuper, RafsSuperConfig, RafsVersion}; +use nydus_service::block_device::BlockDevice; +use nydus_service::{validate_threads_configuration, ServiceArgs}; use nydus_storage::backend::localfs::LocalFs; use nydus_storage::backend::BlobBackend; use nydus_storage::device::BlobFeatures; @@ -429,6 +432,55 @@ fn prepare_cmd_args(bti_string: &'static str) -> App { .arg(arg_output_json.clone()), ); + let app = app.subcommand( + App::new("export") + .about("Export RAFS filesystems as raw block disk images or tar files") + .arg( + Arg::new("block") + .long("block") + .action(ArgAction::SetTrue) + .required(true) + .help("Export RAFS filesystems as raw block disk images") + ) + .arg( + Arg::new("bootstrap") + .long("bootstrap") + .short('B') + .help("Bootstrap of the RAFS filesystem to be exported") + .requires("localfs-dir") + ) + .arg(Arg::new("config") + .long("config") + .short('C') + .help("Configuration file containing a `BlobCacheEntry` object") + .required(false)) + .arg( + Arg::new("localfs-dir") + .long("localfs-dir") + .short('D') + .help( + "Path to the `localfs` working directory, which also enables the `localfs` storage backend" + ) + .requires("bootstrap") + .conflicts_with("config"), + ) + .arg( + Arg::new("threads") + .long("threads") + .default_value("4") + .help("Number of worker threads to execute export operation, valid values: [1-32]") + .value_parser(thread_validator) + .required(false), + ) + .arg( + Arg::new("output") + .long("output") + .short('O') + .help("File path for saving the exported content") + .required_unless_present("localfs-dir") + ) + ); + let app = app.subcommand( App::new("inspect") .about("Inspect RAFS filesystem metadata in interactive or request mode") @@ -624,6 +676,8 @@ fn main() -> Result<()> { Command::merge(matches, &build_info) } else if let Some(matches) = cmd.subcommand_matches("check") { Command::check(matches, &build_info) + } else if let Some(matches) = cmd.subcommand_matches("export") { + Command::export(&cmd, matches, &build_info) } else if let Some(matches) = cmd.subcommand_matches("inspect") { Command::inspect(matches) } else if let Some(matches) = cmd.subcommand_matches("stat") { @@ -1128,6 +1182,16 @@ impl Command { Ok(()) } + fn export(args: &ArgMatches, subargs: &ArgMatches, build_info: &BuildTimeInfo) -> Result<()> { + let subargs = SubCmdArgs::new(args, subargs); + if subargs.is_present("block") { + Self::export_block(&subargs, build_info)?; + } else { + bail!("unknown export type"); + } + Ok(()) + } + fn inspect(matches: &ArgMatches) -> Result<()> { let bootstrap_path = Self::get_bootstrap(matches)?; let mut config = Self::get_configuration(matches)?; @@ -1443,4 +1507,63 @@ impl Command { ); Ok(()) } + + fn export_block(subargs: &SubCmdArgs, _bti: &BuildTimeInfo) -> Result<()> { + let mut localfs_dir = None; + let mut entry = if let Some(dir) = subargs.value_of("localfs-dir") { + // Safe to unwrap because `--block` requires `--bootstrap`. + let bootstrap = subargs.value_of("bootstrap").unwrap(); + let config = format!( + r#" + {{ + "type": "bootstrap", + "id": "disk-default", + "domain_id": "block-nbd", + "config_v2": {{ + "version": 2, + "id": "block-nbd-factory", + "backend": {{ + "type": "localfs", + "localfs": {{ + "dir": "{}" + }} + }}, + "cache": {{ + "type": "filecache", + "filecache": {{ + "work_dir": "{}" + }} + }}, + "metadata_path": "{}" + }} + }}"#, + dir, dir, bootstrap + ); + localfs_dir = Some(dir.to_string()); + BlobCacheEntry::from_str(&config)? + } else if let Some(v) = subargs.value_of("config") { + BlobCacheEntry::from_file(v)? + } else { + bail!("both option `-C/--config` and `-D/--localfs-dir` are missing"); + }; + if !entry.prepare_configuration_info() { + bail!("invalid blob cache entry configuration information"); + } + if !entry.validate() { + bail!("invalid blob cache entry configuration information"); + } + + let threads: u32 = subargs + .value_of("threads") + .map(|n| n.parse().unwrap_or(1)) + .unwrap_or(1); + let output = subargs.value_of("output").map(|v| v.to_string()); + + BlockDevice::export(entry, output, localfs_dir, threads) + .context("failed to export RAFS filesystem as raw block device image") + } +} + +fn thread_validator(v: &str) -> std::result::Result { + validate_threads_configuration(v).map(|s| s.to_string()) } From fc3979e46aedd9b3fbd68d8cc5c4936c6c46261f Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Mon, 20 Mar 2023 16:54:18 +0800 Subject: [PATCH 3/3] nydus-image: enable multi-threading when exporting block images Enable multi-threading when exporting block images, to reduce exporting time. Signed-off-by: Jiang Liu --- service/src/block_device.rs | 79 ++++++++++++++++++++++++++++++++++--- 1 file changed, 74 insertions(+), 5 deletions(-) diff --git a/service/src/block_device.rs b/service/src/block_device.rs index a40eb73a108..d3f921a3753 100644 --- a/service/src/block_device.rs +++ b/service/src/block_device.rs @@ -11,11 +11,13 @@ //! Based on the block address scheme, an RAFSv6 image can be converted into/represented as a block //! device, so it can be directly mounted by Linux EROFS fs driver. -use std::cmp::min; +use std::cmp::{max, min}; use std::fs::OpenOptions; use std::io::Result; use std::path::PathBuf; use std::sync::Arc; +use std::thread; +use std::thread::JoinHandle; use dbs_allocator::{Constraint, IntervalTree, NodeState, Range}; use nydus_api::BlobCacheEntry; @@ -284,7 +286,7 @@ impl BlockDevice { blob_entry: BlobCacheEntry, output: Option, localfs_dir: Option, - _threads: u32, + threads: u32, ) -> Result<()> { let cache_mgr = Arc::new(BlobCacheMgr::new()); cache_mgr.add_blob_entry(&blob_entry).map_err(|e| { @@ -351,10 +353,77 @@ impl BlockDevice { })?; let output_file = Arc::new(tokio_uring::fs::File::from_std(output_file)); - tokio_uring::start(async move { - Self::do_export(block_device.clone(), output_file, 0, block_device.blocks()).await - })?; + let blocks = block_device.blocks(); + let batch_size = BLOCK_DEVICE_EXPORT_BATCH_SIZE as u32 / block_device.block_size() as u32; + assert_eq!(batch_size.count_ones(), 1); + let threads = max(threads, 1); + let mut threads = min(threads, 32); + while blocks / threads < batch_size && threads > 1 { + threads /= 2; + } + if threads == 1 { + tokio_uring::start(async move { + Self::do_export(block_device.clone(), output_file, 0, block_device.blocks()).await + })?; + } else { + let mut thread_handlers: Vec>> = + Vec::with_capacity(threads as usize); + let step = (blocks + batch_size - 1) & !(batch_size - 1); + let mut pos = 0; + + for _i in 0..threads { + let count = min(blocks - pos, step); + let mgr = cache_mgr.clone(); + let id = blob_id.clone(); + let path = path.to_path_buf(); + + let handler = thread::spawn(move || { + let output_file = OpenOptions::new() + .read(true) + .write(true) + .open(&path) + .map_err(|e| { + eother!(format!( + "block_device: failed to create output file {}, {}", + path.display(), + e + )) + })?; + let file = Arc::new(tokio_uring::fs::File::from_std(output_file)); + let block_device = BlockDevice::new(id, mgr).map_err(|e| { + eother!(format!( + "block_device: failed to create block device object, {}", + e + )) + })?; + let device = Arc::new(block_device); + + tokio_uring::start( + async move { Self::do_export(device, file, pos, count).await }, + )?; + Ok(()) + }); + pos += count; + thread_handlers.push(handler); + } + assert_eq!(pos, blocks); + assert_eq!(thread_handlers.len(), threads as usize); + + for handler in thread_handlers { + handler + .join() + .map_err(|e| { + eother!(format!( + "block_device: failed to wait for worker thread, {:?}", + e + )) + })? + .map_err(|e| { + eother!(format!("block_device: failed to export disk image, {}", e)) + })?; + } + } Ok(()) }