Skip to content

Commit

Permalink
Only copy referenced columns in queried buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Aug 19, 2024
1 parent a39a38b commit 4b2b70e
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 20 deletions.
14 changes: 13 additions & 1 deletion src/ingest/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,22 @@ impl Buffer {
}

pub fn heap_size_of_children(&self) -> usize {
self.buffer.values().map(|v| {
self.buffer
.values()
.map(|v| {
// Currently does not take into account the memory of String.
v.heap_size_of_children()
})
.sum()
}

pub fn filter(&self, columns: &[String]) -> Buffer {
Buffer {
buffer: columns
.iter()
.filter_map(|name| self.buffer.get(name).map(|col| (name.clone(), col.clone())))
.collect(),
length: self.length,
}
}
}
3 changes: 2 additions & 1 deletion src/locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ impl LocustDB {
Err(err) => return Ok(Err(err)),
};

let mut data = match self.inner_locustdb.snapshot(&query.table) {
let referenced_cols: Vec<_> = query.find_referenced_cols().into_iter().collect();
let mut data = match self.inner_locustdb.snapshot(&query.table, Some(&referenced_cols[..])) {
Some(data) => data,
None => {
return Ok(Err(QueryError::NotImplemented(format!(
Expand Down
12 changes: 2 additions & 10 deletions src/mem_store/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ impl Partition {
for name in md.column_name_to_subpartition_index.keys() {
cols.insert(
name.clone(),
ColumnHandle::non_resident(
table,
md.id,
name.clone(),
),
ColumnHandle::non_resident(table, md.id, name.clone()),
);
}
Partition {
Expand Down Expand Up @@ -284,11 +280,7 @@ impl ColumnHandle {
}
}

fn non_resident(
table: &str,
id: PartitionID,
name: String,
) -> ColumnHandle {
fn non_resident(table: &str, id: PartitionID, name: String) -> ColumnHandle {
ColumnHandle {
key: ColumnLocator::new(table, id, &name),
name,
Expand Down
18 changes: 13 additions & 5 deletions src/mem_store/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,22 @@ impl Table {
&self.name
}

pub fn snapshot(&self) -> Vec<Arc<Partition>> {
pub fn snapshot(&self, column_filter: Option<&[String]>) -> Vec<Arc<Partition>> {
let frozen_buffer = self.frozen_buffer.lock().unwrap();
let partitions = self.partitions.read().unwrap();
let buffer = self.buffer.lock().unwrap();
let mut partitions: Vec<_> = partitions.values().cloned().collect();
let mut offset = partitions.iter().map(|p| p.len()).sum::<usize>();
if frozen_buffer.len() > 0 {
let buffer = match column_filter {
Some(columns) => frozen_buffer.filter(columns),
None => frozen_buffer.clone(),
};
partitions.push(Arc::new(
Partition::from_buffer(
self.name(),
u64::MAX,
frozen_buffer.clone(),
buffer,
self.lru.clone(),
offset,
)
Expand All @@ -74,11 +78,15 @@ impl Table {
offset += frozen_buffer.len();
}
if buffer.len() > 0 {
let buffer = match column_filter {
Some(columns) => buffer.filter(columns),
None => buffer.clone(),
};
partitions.push(Arc::new(
Partition::from_buffer(
self.name(),
u64::MAX,
buffer.clone(),
buffer,
self.lru.clone(),
offset,
)
Expand Down Expand Up @@ -335,7 +343,7 @@ impl Table {
size_bytes: 0,
columns: HashMap::default(),
};
let partitions = self.snapshot();
let partitions = self.snapshot(None);
for partition in partitions {
partition.mem_tree(&mut tree.columns, if depth == 1 { 1 } else { depth - 1 });
tree.rows += partition.len();
Expand All @@ -348,7 +356,7 @@ impl Table {
}

pub fn stats(&self) -> TableStats {
let partitions = self.snapshot();
let partitions = self.snapshot(None);
let size_per_column = Table::size_per_column(&partitions);
let buffer = self.buffer.lock().unwrap();
TableStats {
Expand Down
6 changes: 3 additions & 3 deletions src/scheduler/inner_locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ impl InnerLocustDB {
thread::spawn(move || cloned.enforce_wal_limit());
}

pub fn snapshot(&self, table: &str) -> Option<Vec<Arc<Partition>>> {
pub fn snapshot(&self, table: &str, column_filter: Option<&[String]>) -> Option<Vec<Arc<Partition>>> {
let tables = self.tables.read().unwrap();
tables.get(table).map(|t| t.snapshot())
tables.get(table).map(|t| t.snapshot(column_filter))
}

pub fn full_snapshot(&self) -> Vec<Vec<Arc<Partition>>> {
let tables = self.tables.read().unwrap();
tables.values().map(|t| t.snapshot()).collect()
tables.values().map(|t| t.snapshot(None)).collect()
}

pub fn stop(&self) {
Expand Down

0 comments on commit 4b2b70e

Please sign in to comment.