Skip to content

Commit

Permalink
refactor: use multi threading when computing outboards
Browse files Browse the repository at this point in the history
I did not do this previously because something wasn't ready.
But now I think it is time.
  • Loading branch information
rklaehn committed Feb 13, 2023
1 parent b3ee39f commit 34cf223
Showing 1 changed file with 22 additions and 16 deletions.
38 changes: 22 additions & 16 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{collections::HashMap, sync::Arc};
use anyhow::{bail, ensure, Context, Result};
use bao::encode::SliceExtractor;
use bytes::{Bytes, BytesMut};
use futures::future;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::broadcast;
Expand Down Expand Up @@ -555,8 +556,16 @@ impl From<&std::path::Path> for DataSource {
///
/// If the size of the file is changed while this is running, an error will be
/// returned.
fn compute_outboard(path: PathBuf) -> anyhow::Result<(Hash, Vec<u8>)> {
let file = std::fs::File::open(path)?;
fn compute_outboard(
path: PathBuf,
name: Option<String>,
) -> anyhow::Result<(PathBuf, Option<String>, Hash, Vec<u8>)> {
ensure!(
path.is_file(),
"can only transfer blob data: {}",
path.display()
);
let file = std::fs::File::open(&path)?;
let len = file.metadata()?.len();
// compute outboard size so we can pre-allocate the buffer.
//
Expand All @@ -581,7 +590,7 @@ fn compute_outboard(path: PathBuf) -> anyhow::Result<(Hash, Vec<u8>)> {
// this flips the outboard encoding from post-order to pre-order
let hash = encoder.finalize()?;

Ok((hash.into(), outboard))
Ok((path, name, hash.into(), outboard))
}

/// Creates a database of blobs (stored in outboard storage) and Collections, stored in memory.
Expand All @@ -593,23 +602,19 @@ pub async fn create_collection(data_sources: Vec<DataSource>) -> Result<(Databas
let mut total_blobs_size: u64 = 0;

let mut blobs_encoded_size_estimate = 0;
for data in data_sources {

let outboards = data_sources.into_iter().map(|data| {
let (path, name) = match data {
DataSource::File(path) => (path, None),
DataSource::NamedFile { path, name } => (path, Some(name)),
};

ensure!(
path.is_file(),
"can only transfer blob data: {}",
path.display()
);
// spawn a blocking task for computing the hash and outboard.
// pretty sure this is best to remain sync even once bao is async.
let path2 = path.clone();
let (hash, outboard) =
tokio::task::spawn_blocking(move || compute_outboard(path2)).await??;

tokio::task::spawn_blocking(move || compute_outboard(path, name))
});
let outboards = future::join_all(outboards)
.await
.into_iter()
.collect::<Result<Result<Vec<_>, _>, _>>()??;
for (path, name, hash, outboard) in outboards {
debug_assert!(outboard.len() >= 8, "outboard must at least contain size");
let size = u64::from_le_bytes(outboard[..8].try_into().unwrap());
db.insert(
Expand All @@ -631,6 +636,7 @@ pub async fn create_collection(data_sources: Vec<DataSource>) -> Result<(Databas
blobs_encoded_size_estimate += name.len() + 32;
blobs.push(Blob { name, hash });
}

let c = Collection {
name: "collection".to_string(),
blobs,
Expand Down

0 comments on commit 34cf223

Please sign in to comment.