Skip to content

Commit

Permalink
Scan files in physical order when --transform is used
Browse files Browse the repository at this point in the history
  • Loading branch information
pkolaczk committed Jun 11, 2022
1 parent eb46b32 commit 4616994
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 13 deletions.
50 changes: 50 additions & 0 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use std::{fs, io};

use byte_unit::Byte;
use bytesize::ByteSize;
use rayon::iter::{IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator};
use serde::*;
use smallvec::alloc::fmt::Formatter;
use smallvec::alloc::str::FromStr;

use crate::device::DiskDevices;
use crate::group::FileGroup;
use crate::log::Log;
use crate::path::Path;

Expand Down Expand Up @@ -452,6 +454,54 @@ impl<'de> Deserialize<'de> for FileHash {
}
}

/// Makes it possible to operate generically on collections of files, regardless
/// of the way how the collection is implemented. We sometimes need to work on grouped files
/// but sometimes we just have a flat vector.
pub(crate) trait FileCollection {
/// Returns the number of files in the collection
fn count(&self) -> usize;
/// Returns the total size of files in the collection
fn total_size(&self) -> FileLen;
/// Performs given action on each file in the collection
fn for_each_mut<OP>(&mut self, op: OP)
where
OP: Fn(&mut FileInfo) + Sync + Send;
}

impl FileCollection for Vec<FileInfo> {
fn count(&self) -> usize {
self.len()
}

fn total_size(&self) -> FileLen {
self.par_iter().map(|f| f.len).sum()
}

fn for_each_mut<OP>(&mut self, op: OP)
where
OP: Fn(&mut FileInfo) + Sync + Send,
{
self.par_iter_mut().for_each(op)
}
}

impl FileCollection for Vec<FileGroup<FileInfo>> {
fn count(&self) -> usize {
self.iter().map(|g| g.file_count()).sum()
}

fn total_size(&self) -> FileLen {
self.par_iter().map(|g| g.total_size()).sum()
}

fn for_each_mut<OP>(&mut self, op: OP)
where
OP: Fn(&mut FileInfo) + Sync + Send,
{
self.par_iter_mut().flat_map(|g| &mut g.files).for_each(op)
}
}

#[derive(Copy, Clone, Debug)]
pub(crate) enum FileAccess {
Sequential,
Expand Down
24 changes: 11 additions & 13 deletions src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,23 +739,20 @@ fn atomic_counter_vec(len: usize) -> Vec<std::sync::atomic::AtomicU32> {
}

#[cfg(target_os = "linux")]
fn update_file_locations(ctx: &GroupCtx<'_>, groups: &mut Vec<FileGroup<FileInfo>>) {
let count = file_count(groups.iter());
fn update_file_locations(ctx: &GroupCtx<'_>, groups: &mut (impl FileCollection + ?Sized)) {
let count = groups.count();
let progress = ctx.log.progress_bar("Fetching extents", count as u64);

let err_counters = atomic_counter_vec(ctx.devices.len());
groups
.par_iter_mut()
.flat_map(|g| &mut g.files)
.update(|fi| {
let device: &DiskDevice = &ctx.devices[fi.get_device_index()];
if device.disk_type != DiskType::SSD {
if let Err(e) = fi.fetch_physical_location() {
handle_fetch_physical_location_err(ctx, &err_counters, fi, e)
}
groups.for_each_mut(|fi| {
let device: &DiskDevice = &ctx.devices[fi.get_device_index()];
if device.disk_type != DiskType::SSD {
if let Err(e) = fi.fetch_physical_location() {
handle_fetch_physical_location_err(ctx, &err_counters, fi, e)
}
})
.for_each(|_| progress.tick());
}
progress.tick()
});
}

#[cfg(not(target_os = "linux"))]
Expand Down Expand Up @@ -1071,6 +1068,7 @@ pub fn group_files(config: &GroupConfig, log: &Log) -> Result<Vec<FileGroup<File
Some(_transform) => {
let mut files = matching_files.into_iter().flatten().collect_vec();
deduplicate(&mut files, |_| {});
update_file_locations(&ctx, &mut files);
group_transformed(&ctx, files)
}
_ => {
Expand Down

0 comments on commit 4616994

Please sign in to comment.