Skip to content

Commit

Permalink
feat(query): support query level spill setting (#17542)
Browse files Browse the repository at this point in the history
* feat(query): support query level spill setting

* feat(query): support query level spill setting

* feat(query): support query level spill setting

* feat(query): support query level spill setting

* feat(query): support query level spill setting

* feat(query): support query level spill setting

* feat(query): support query level spill setting

* feat(query): support query level spill setting

* feat(query): support query level spill setting

* feat(query): support query level spill setting

* feat(query): support query level spill setting

* feat(query): support query level spill setting
  • Loading branch information
zhang2014 authored Mar 3, 2025
1 parent c7ed78d commit 5cbc165
Show file tree
Hide file tree
Showing 45 changed files with 878 additions and 540 deletions.
13 changes: 1 addition & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/common/base/src/runtime/memory/mem_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ impl MemStat {
}

#[inline]
pub fn get_memory_usage(&self) -> i64 {
self.used.load(Ordering::Relaxed)
pub fn get_memory_usage(&self) -> usize {
std::cmp::max(self.used.load(Ordering::Relaxed), 0) as usize
}

#[inline]
Expand Down
7 changes: 6 additions & 1 deletion src/query/ee/tests/it/aggregating_index/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,14 +1030,19 @@ fn get_test_suites() -> Vec<TestSuite> {

async fn test_fuzz_impl(format: &str, spill: bool) -> Result<()> {
let test_suites = get_test_suites();

let spill_settings = if spill {
Some(HashMap::from([
(
"aggregate_spilling_memory_ratio".to_string(),
"100".to_string(),
),
(
"aggregate_spilling_bytes_threshold_per_proc".to_string(),
"query_out_of_memory_behavior".to_string(),
"spilling".to_string(),
),
(
"max_query_memory_usage".to_string(),
"1".to_string(),
),
]))
Expand Down
222 changes: 222 additions & 0 deletions src/query/pipeline/transforms/src/processors/memory_settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_base::runtime::MemStat;
use databend_common_base::runtime::GLOBAL_MEM_STAT;

#[derive(Clone)]
pub struct MemorySettings {
pub max_memory_usage: usize,
pub enable_global_level_spill: bool,
pub global_memory_tracking: &'static MemStat,

pub max_query_memory_usage: usize,
pub query_memory_tracking: Option<Arc<MemStat>>,
pub enable_query_level_spill: bool,

pub spill_unit_size: usize,
}

impl MemorySettings {
pub fn disable_spill() -> MemorySettings {
MemorySettings {
spill_unit_size: 0,
max_memory_usage: usize::MAX,
enable_global_level_spill: false,
max_query_memory_usage: usize::MAX,
query_memory_tracking: None,
enable_query_level_spill: false,
global_memory_tracking: &GLOBAL_MEM_STAT,
}
}

pub fn always_spill(spill_unit_size: usize) -> MemorySettings {
MemorySettings {
spill_unit_size,
max_memory_usage: 0,
max_query_memory_usage: 0,
enable_query_level_spill: true,
enable_global_level_spill: true,
global_memory_tracking: &GLOBAL_MEM_STAT,
query_memory_tracking: None,
}
}

pub fn check_spill(&self) -> bool {
if self.enable_global_level_spill
&& self.global_memory_tracking.get_memory_usage() >= self.max_memory_usage
{
return true;
}

let Some(query_memory_tracking) = self.query_memory_tracking.as_ref() else {
return false;
};

self.enable_query_level_spill
&& query_memory_tracking.get_memory_usage() >= self.max_query_memory_usage
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use databend_common_base::base::GlobalUniqName;

use super::*;

impl Default for MemorySettings {
fn default() -> Self {
Self {
max_memory_usage: usize::MAX,
enable_global_level_spill: false,
global_memory_tracking: create_static_mem_stat(0),
max_query_memory_usage: 0,
query_memory_tracking: None,
enable_query_level_spill: false,
spill_unit_size: 4096,
}
}
}

fn create_static_mem_stat(usage: usize) -> &'static MemStat {
let mem_stat = MemStat::create(GlobalUniqName::unique());
let _ = mem_stat.record_memory::<false>(usage as i64, 0);
Box::leak(Box::new(Arc::into_inner(mem_stat).unwrap()))
}

fn create_mem_stat(usage: usize) -> Arc<MemStat> {
let mem_stat = MemStat::create(GlobalUniqName::unique());
let _ = mem_stat.record_memory::<false>(usage as i64, 0);
mem_stat
}

#[test]
fn global_spill_triggered_when_global_memory_reaches_threshold() {
let global_mem = create_static_mem_stat(100);
let settings = MemorySettings {
enable_global_level_spill: true,
global_memory_tracking: global_mem,
max_memory_usage: 100,
..Default::default()
};
assert!(settings.check_spill());
}

#[test]
fn query_spill_triggered_when_both_levels_enabled_and_query_exceeds() {
let query_mem = create_mem_stat(100);
let global_mem = create_static_mem_stat(50);

let settings = MemorySettings {
enable_global_level_spill: true,
global_memory_tracking: global_mem,
max_query_memory_usage: 100,
enable_query_level_spill: true,
query_memory_tracking: Some(query_mem.clone()),
max_memory_usage: 100,
..Default::default()
};
assert!(settings.check_spill());
}

#[test]
fn query_spill_alone_triggered_when_enabled_and_exceeds() {
let query_mem = create_mem_stat(100);

let settings = MemorySettings {
enable_query_level_spill: true,
max_query_memory_usage: 100,
query_memory_tracking: Some(query_mem.clone()),
..Default::default()
};
assert!(settings.check_spill());
}

#[test]
fn no_spill_when_neither_condition_met() {
let global_mem = create_static_mem_stat(50);
let query_mem = create_mem_stat(50);

let settings = MemorySettings {
enable_global_level_spill: true,
global_memory_tracking: global_mem,
max_query_memory_usage: 100,
max_memory_usage: 100,
enable_query_level_spill: true,
query_memory_tracking: Some(query_mem.clone()),
..Default::default()
};
assert!(!settings.check_spill());
}

#[test]
fn no_query_spill_when_no_tracking() {
let global_mem = create_static_mem_stat(50);

let settings = MemorySettings {
enable_global_level_spill: false,
global_memory_tracking: global_mem,
max_query_memory_usage: 100,
max_memory_usage: 100,
enable_query_level_spill: true,
query_memory_tracking: None,
..Default::default()
};
assert!(!settings.check_spill());
}

#[test]
fn boundary_case_exact_threshold() {
let query_mem = create_mem_stat(100);

let settings = MemorySettings {
enable_query_level_spill: true,
max_query_memory_usage: 100,
query_memory_tracking: Some(query_mem.clone()),
..Default::default()
};
assert!(settings.check_spill());
}

#[test]
fn global_priority_over_query_when_both_exceed() {
let query_mem = create_mem_stat(150);
let global_mem = create_static_mem_stat(150);

let settings = MemorySettings {
enable_global_level_spill: true,
global_memory_tracking: global_mem,
max_query_memory_usage: 100,
max_memory_usage: 100,
enable_query_level_spill: true,
query_memory_tracking: Some(query_mem.clone()),
..Default::default()
};
assert!(settings.check_spill());
}

#[test]
fn no_spill_when_both_levels_disabled() {
let settings = MemorySettings {
enable_global_level_spill: false,
enable_query_level_spill: false,
..Default::default()
};
assert!(!settings.check_spill());
}
}
2 changes: 2 additions & 0 deletions src/query/pipeline/transforms/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod memory_settings;
mod transforms;

pub use memory_settings::MemorySettings;
pub use transforms::*;
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use super::transform_sort_merge_base::MergeSort;
use super::transform_sort_merge_base::TransformSortMergeBase;
use super::AccumulatingTransform;
use crate::processors::sort::Merger;
use crate::MemorySettings;

/// Merge sort blocks without limit.
///
Expand Down Expand Up @@ -213,14 +214,15 @@ pub fn sort_merge(
have_order_col: bool,
) -> Result<Vec<DataBlock>> {
let sort_desc = Arc::new(sort_desc);
let mut memory_settings = MemorySettings::disable_spill();
memory_settings.spill_unit_size = sort_spilling_batch_bytes;

let mut processor = MergeSortCommon::try_create(
schema.clone(),
sort_desc.clone(),
have_order_col,
false,
0,
0,
sort_spilling_batch_bytes,
memory_settings,
MergeSortCommonImpl::create(schema, sort_desc, block_size, enable_loser_tree, None),
)?;
for block in data_blocks {
Expand Down
Loading

0 comments on commit 5cbc165

Please sign in to comment.