Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed IPC projection #1149

Merged
merged 1 commit into from
Jul 8, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::io::{Read, Seek};

use arrow_format;
Expand Down Expand Up @@ -323,12 +323,6 @@ pub fn prepare_projection(
fields: &[Field],
mut projection: Vec<usize>,
) -> (Vec<usize>, HashMap<usize, usize>, Vec<Field>) {
assert_eq!(
projection.iter().collect::<HashSet<_>>().len(),
projection.len(),
"The projection on IPC must not contain duplicates"
);

let fields = projection.iter().map(|x| fields[*x].clone()).collect();

// todo: find way to do this more efficiently
Expand All @@ -337,14 +331,25 @@ pub fn prepare_projection(
let map = indices.iter().copied().enumerate().fold(
HashMap::default(),
|mut acc, (index, new_index)| {
if !acc.contains_key(&new_index) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch could lead to less mappings than there were projections. I believe this was a bug, because all columns need to be mapped.

acc.insert(index, new_index);
};
acc.insert(index, new_index);
acc
},
);
projection.sort_unstable();

// check unique
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we already sort the projection, we don't need to collect into a HashSet to check uniqueness.

if !projection.is_empty() {
let mut previous = projection[0];

for &i in &projection[1..] {
assert!(
previous < i,
"The projection on IPC must not contain duplicates"
);
previous = i;
}
}

(projection, map, fields)
}

Expand All @@ -353,9 +358,11 @@ pub fn apply_projection(
map: &HashMap<usize, usize>,
) -> Chunk<Box<dyn Array>> {
// re-order according to projection
let mut arrays = chunk.into_arrays();
map.iter().for_each(|(old, new)| {
arrays.swap(*old, *new);
});
Chunk::new(arrays)
let arrays = chunk.into_arrays();
let mut new_arrays = arrays.clone();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swapping could fail depending of the order of swappend indices. We need to make a copy to ensure we don't swap too early.

map.iter()
.for_each(|(old, new)| new_arrays[*new] = arrays[*old].clone());

Chunk::new(new_arrays)
}