Skip to content

Commit

Permalink
Minor config tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed May 28, 2024
1 parent 2347dca commit 70acd6f
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 26 deletions.
7 changes: 4 additions & 3 deletions crates/bifrost/src/loglets/local_loglet/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt::Write;
use std::mem::size_of;

use bytes::{Buf, BufMut, Bytes, BytesMut};
Expand All @@ -17,6 +16,8 @@ use restate_types::logs::SequenceNumber;

use crate::loglet::LogletOffset;

pub(crate) const DATA_KEY_PREFIX_LENGTH: usize = size_of::<u8>() + size_of::<u64>();

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RecordKey {
pub log_id: u64,
Expand All @@ -37,7 +38,7 @@ impl RecordKey {

pub fn to_bytes(self) -> Bytes {
let mut buf = BytesMut::with_capacity(size_of::<Self>() + 1);
buf.write_char('d').expect("enough key buffer");
buf.put_u8(b'd');
buf.put_u64(self.log_id);
buf.put_u64(self.offset.into());
buf.freeze()
Expand Down Expand Up @@ -75,7 +76,7 @@ impl MetadataKey {
pub fn to_bytes(self) -> Bytes {
let mut buf = BytesMut::with_capacity(size_of::<Self>() + 1);
// m for metadata
buf.write_char('m').expect("enough key buffer");
buf.put_u8(b'm');
buf.put_u64(self.log_id);
buf.put_u8(self.kind as u8);
buf.freeze()
Expand Down
17 changes: 10 additions & 7 deletions crates/bifrost/src/loglets/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use restate_rocksdb::{
use restate_types::arc_util::Updateable;
use restate_types::config::{LocalLogletOptions, RocksDbOptions};
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
use rocksdb::{BoundColumnFamily, DBCompressionType, DB};
use rocksdb::{BoundColumnFamily, DBCompressionType, SliceTransform, DB};

use super::keys::{MetadataKey, MetadataKind};
use super::keys::{MetadataKey, MetadataKind, DATA_KEY_PREFIX_LENGTH};
use super::log_state::{log_state_full_merge, log_state_partial_merge, LogState};
use super::log_store_writer::LogStoreWriter;

Expand Down Expand Up @@ -138,12 +138,15 @@ fn cf_data_options(mut opts: rocksdb::Options) -> rocksdb::Options {
opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Zstd,
DBCompressionType::Zstd,
DBCompressionType::Zstd,
DBCompressionType::Zstd,
DBCompressionType::Zstd,
]);

opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(DATA_KEY_PREFIX_LENGTH));
opts.set_memtable_prefix_bloom_ratio(0.2);
// most reads are sequential
opts.set_advise_random_on_open(false);
//
Expand All @@ -158,7 +161,7 @@ fn cf_metadata_options(mut opts: rocksdb::Options) -> rocksdb::Options {
opts.set_num_levels(3);
opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::None,
DBCompressionType::Snappy,
DBCompressionType::Zstd,
]);
opts.set_max_write_buffer_number(2);
Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ fn tokio_builder(common_opts: &CommonOptions) -> tokio::runtime::Builder {
format!("rs:worker-{}", id)
});

if let Some(worker_threads) = common_opts.default_thread_pool_size {
builder.worker_threads(worker_threads);
}
builder.worker_threads(common_opts.default_thread_pool_size());

builder
}
Expand Down
2 changes: 2 additions & 0 deletions crates/rocksdb/src/db_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ impl RocksDbManager {
// /~https://github.com/facebook/rocksdb/blob/f059c7d9b96300091e07429a60f4ad55dac84859/include/rocksdb/table.h#L275
block_opts.set_format_version(5);
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);

block_opts.set_block_cache(&self.cache);
cf_options.set_block_based_table_factory(&block_opts);

Expand Down
23 changes: 12 additions & 11 deletions crates/types/src/config/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ pub struct CommonOptions {
/// # Default async runtime thread pool
///
/// Size of the default thread pool used to perform internal tasks.
/// If not set, it defaults to the number of CPU cores.
/// If not set, it defaults to twice the number of CPU cores.
#[builder(setter(strip_option))]
pub default_thread_pool_size: Option<usize>,
default_thread_pool_size: Option<usize>,

/// # Tracing Endpoint
///
Expand Down Expand Up @@ -246,19 +246,20 @@ impl CommonOptions {
}

pub fn storage_high_priority_bg_threads(&self) -> NonZeroUsize {
self.storage_high_priority_bg_threads.unwrap_or(
NonZeroUsize::new(4).unwrap()
}

pub fn default_thread_pool_size(&self) -> usize {
2 * self.default_thread_pool_size.unwrap_or(
std::thread::available_parallelism()
// Shouldn't really fail, but just in case.
.unwrap_or(NonZeroUsize::new(4).unwrap()),
.unwrap_or(NonZeroUsize::new(4).unwrap())
.get(),
)
}

pub fn storage_low_priority_bg_threads(&self) -> NonZeroUsize {
self.storage_low_priority_bg_threads.unwrap_or(
std::thread::available_parallelism()
// Shouldn't really fail, but just in case.
.unwrap_or(NonZeroUsize::new(4).unwrap()),
)
NonZeroUsize::new(4).unwrap()
}

pub fn rocksdb_bg_threads(&self) -> NonZeroU32 {
Expand Down Expand Up @@ -301,8 +302,8 @@ impl Default for CommonOptions {
default_thread_pool_size: None,
storage_high_priority_bg_threads: None,
storage_low_priority_bg_threads: None,
rocksdb_total_memtables_ratio: 0.5, // (50% of rocksdb-total-memory-size)
rocksdb_total_memory_size: NonZeroUsize::new(4_000_000_000).unwrap(), // 4GB
rocksdb_total_memtables_ratio: 0.6, // (60% of rocksdb-total-memory-size)
rocksdb_total_memory_size: NonZeroUsize::new(6_000_000_000).unwrap(), // 4GB
rocksdb_bg_threads: None,
rocksdb_high_priority_bg_threads: NonZeroU32::new(2).unwrap(),
rocksdb_write_stall_threshold: std::time::Duration::from_secs(3).into(),
Expand Down
2 changes: 2 additions & 0 deletions crates/types/src/config/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ impl RocksDbOptions {
// Assuming 256MB for bifrost's data cf (2 memtables * 128MB default write buffer size)
// Assuming 256MB for bifrost's metadata cf (2 memtables * 128MB default write buffer size)
let buffer_size = (all_memtables - 512_000_000) / (num_partitions * 3) as usize;
// reduce the buffer_size by 10% for safety
let buffer_size = (buffer_size as f64 * 0.9) as usize;
NonZeroUsize::new(buffer_size).unwrap()
})
}
Expand Down
4 changes: 2 additions & 2 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl WorkerOptions {
impl Default for WorkerOptions {
fn default() -> Self {
Self {
internal_queue_length: NonZeroUsize::new(64).unwrap(),
internal_queue_length: NonZeroUsize::new(6400).unwrap(),
num_timers_in_memory_limit: None,
storage: StorageOptions::default(),
invoker: Default::default(),
Expand Down Expand Up @@ -181,7 +181,7 @@ impl Default for InvokerOptions {
message_size_warning: NonZeroUsize::new(10_000_000).unwrap(), // 10MB
message_size_limit: None,
tmp_dir: None,
concurrent_invocations_limit: None,
concurrent_invocations_limit: Some(NonZeroUsize::new(10_000).unwrap()),
disable_eager_state: false,
}
}
Expand Down

0 comments on commit 70acd6f

Please sign in to comment.