Skip to content

Commit

Permalink
Metadata store uses new memory budgeting
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed May 31, 2024
1 parent faab766 commit 7c0db24
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 84 deletions.
16 changes: 4 additions & 12 deletions crates/metadata-store/src/local/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,11 @@ impl LocalMetadataStoreService {
bind_address,
}
}
pub fn from_options<F, V>(
pub fn from_options(
opts: &MetadataStoreOptions,
rocksdb_options: F,
) -> Result<Self, BuildError>
where
F: Fn() -> V,
V: Updateable<RocksDbOptions> + Send + 'static,
{
let store = LocalMetadataStore::new(
opts.data_dir(),
opts.request_queue_length(),
rocksdb_options,
)?;
rocksdb_options: impl Updateable<RocksDbOptions> + Send + Sync + Clone + 'static,
) -> Result<Self, BuildError> {
let store = LocalMetadataStore::new(opts, rocksdb_options)?;
Ok(LocalMetadataStoreService::new(
store,
opts.bind_address.clone(),
Expand Down
89 changes: 61 additions & 28 deletions crates/metadata-store/src/local/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ use restate_rocksdb::{
RocksError,
};
use restate_types::arc_util::Updateable;
use restate_types::config::RocksDbOptions;
use restate_types::config::{MetadataStoreOptions, RocksDbOptions};
use restate_types::storage::{
StorageCodec, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError,
};
use restate_types::Version;
use rocksdb::{BoundColumnFamily, Options, WriteBatch, WriteOptions, DB};
use std::path::Path;
use rocksdb::{BoundColumnFamily, DBCompressionType, WriteBatch, WriteOptions, DB};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, trace};
Expand Down Expand Up @@ -103,7 +102,7 @@ pub enum BuildError {
pub struct LocalMetadataStore {
db: Arc<DB>,
rocksdb: Arc<RocksDb>,
opts: Box<dyn Updateable<RocksDbOptions> + Send + 'static>,
rocksdb_options: Box<dyn Updateable<RocksDbOptions> + Send + Sync>,
request_rx: RequestReceiver,
buffer: BytesMut,

Expand All @@ -112,51 +111,45 @@ pub struct LocalMetadataStore {
}

impl LocalMetadataStore {
pub fn new<F, V>(
data_dir: impl AsRef<Path>,
request_queue_length: usize,
rocksdb_options: F,
) -> std::result::Result<Self, BuildError>
where
F: Fn() -> V,
V: Updateable<RocksDbOptions> + Send + 'static,
{
let (request_tx, request_rx) = mpsc::channel(request_queue_length);
pub fn new(
options: &MetadataStoreOptions,
updateable_rocksdb_options: impl Updateable<RocksDbOptions> + Clone + Send + Sync + 'static,
) -> std::result::Result<Self, BuildError> {
let (request_tx, request_rx) = mpsc::channel(options.request_queue_length());

let db_name = DbName::new(DB_NAME);
let db_manager = RocksDbManager::get();
let cfs = vec![CfName::new(KV_PAIRS)];
let db_spec = DbSpecBuilder::new(
db_name.clone(),
data_dir.as_ref().to_path_buf(),
Options::default(),
)
.add_cf_pattern(CfPrefixPattern::ANY, |opts| opts)
.ensure_column_families(cfs)
.build_as_db();

let db = db_manager.open_db(rocksdb_options(), db_spec)?;
let db_spec = DbSpecBuilder::new(db_name.clone(), options.data_dir(), db_options(options))
.add_cf_pattern(
CfPrefixPattern::ANY,
cf_options(options.rocksdb_memory_budget()),
)
.ensure_column_families(cfs)
.build_as_db();

let db = db_manager.open_db(updateable_rocksdb_options.clone(), db_spec)?;
let rocksdb = db_manager
.get_db(db_name)
.expect("metadata store db is open");

Ok(Self {
db,
rocksdb,
opts: Box::new(rocksdb_options()),
rocksdb_options: Box::new(updateable_rocksdb_options),
buffer: BytesMut::default(),
request_rx,
request_tx,
})
}

fn write_options(&mut self) -> WriteOptions {
let rocks_db_options = self.opts.load();
let opts = self.rocksdb_options.load();
let mut write_opts = WriteOptions::default();

write_opts.disable_wal(rocks_db_options.rocksdb_disable_wal());
write_opts.disable_wal(opts.rocksdb_disable_wal());

if !rocks_db_options.rocksdb_disable_wal() {
if !opts.rocksdb_disable_wal() {
// always sync if we have wal enabled
write_opts.set_sync(true);
}
Expand Down Expand Up @@ -346,3 +339,43 @@ impl LocalMetadataStore {
}
}
}

fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options {
rocksdb::Options::default()
}

fn cf_options(
memory_budget: usize,
) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static {
move |mut opts| {
set_memory_related_opts(&mut opts, memory_budget);
opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
opts.set_num_levels(3);

opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::None,
DBCompressionType::Zstd,
]);

//
opts
}
}

fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) {
// We set the budget to allow 1 mutable + 3 immutable.
opts.set_write_buffer_size(memtables_budget / 4);

// merge 2 memtables when flushing to L0
opts.set_min_write_buffer_number_to_merge(2);
opts.set_max_write_buffer_number(4);
// start flushing L0->L1 as soon as possible. each file on level0 is
// (memtable_memory_budget / 2). This will flush level 0 when it's bigger than
// memtable_memory_budget.
opts.set_level_zero_file_num_compaction_trigger(2);
// doesn't really matter much, but we don't want to create too many files
opts.set_target_file_size_base(memtables_budget as u64 / 8);
// make Level1 size equal to Level0 size, so that L0->L1 compactions are fast
opts.set_max_bytes_for_level_base(memtables_budget as u64);
}
42 changes: 22 additions & 20 deletions crates/metadata-store/src/local/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ use futures::StreamExt;
use restate_core::{MockNetworkSender, TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder};
use restate_grpc_util::create_grpc_channel_from_advertised_address;
use restate_rocksdb::RocksDbManager;
use restate_types::arc_util::Constant;
use restate_types::config::{CommonOptions, Configuration};
use restate_types::arc_util::{Constant, Updateable};
use restate_types::config::{
reset_base_temp_dir_and_retain, CommonOptions, MetadataStoreOptions, RocksDbOptions,
};
use restate_types::net::{AdvertisedAddress, BindAddress};
use restate_types::retries::RetryPolicy;
use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned};
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::time::Duration;
use test_log::test;
use tonic_health::pb::health_client::HealthClient;
Expand Down Expand Up @@ -54,7 +55,7 @@ flexbuffers_storage_encode_decode!(Value);
/// Tests basic operations of the metadata store.
#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
async fn basic_metadata_store_operations() -> anyhow::Result<()> {
let (client, env) = create_test_environment().await?;
let (client, env) = create_test_environment(&MetadataStoreOptions::default()).await?;

env.tc
.run_in_scope("test", None, async move {
Expand Down Expand Up @@ -149,7 +150,7 @@ async fn basic_metadata_store_operations() -> anyhow::Result<()> {
/// Tests multiple concurrent operations issued by the same client
#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
async fn concurrent_operations() -> anyhow::Result<()> {
let (client, env) = create_test_environment().await?;
let (client, env) = create_test_environment(&MetadataStoreOptions::default()).await?;

env.tc
.run_in_scope("test", None, async move {
Expand Down Expand Up @@ -214,9 +215,14 @@ async fn concurrent_operations() -> anyhow::Result<()> {
/// Tests that the metadata store stores values durably so that they can be read after a restart.
#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
async fn durable_storage() -> anyhow::Result<()> {
let rocksdb_path = tempfile::tempdir()?.into_path();
// get current base dir and use this for subsequent tests.
let base_path = reset_base_temp_dir_and_retain();
let tmp = std::env::temp_dir();
let opts = MetadataStoreOptions::default();
assert!(base_path.starts_with(tmp));
assert_eq!(base_path.join("local-metadata-store"), opts.data_dir());

let (client, env) = create_test_environment_with_path(rocksdb_path.clone()).await?;
let (client, env) = create_test_environment(&opts).await?;

// write data
env.tc
Expand Down Expand Up @@ -246,7 +252,7 @@ async fn durable_storage() -> anyhow::Result<()> {
.await;
// reset RocksDbManager to allow restarting the metadata store
RocksDbManager::get().reset().await?;
let client = start_metadata_store(rocksdb_path, &env.tc).await?;
let client = start_metadata_store(&opts, Constant::new(opts.rocksdb.clone()), &env.tc).await?;

// validate data
env.tc
Expand All @@ -269,18 +275,14 @@ async fn durable_storage() -> anyhow::Result<()> {
.await?;

env.tc.shutdown_node("shutdown", 0).await;
std::fs::remove_dir_all(base_path)?;
Ok(())
}

async fn create_test_environment(
) -> anyhow::Result<(MetadataStoreClient, TestCoreEnv<MockNetworkSender>)> {
create_test_environment_with_path(tempfile::tempdir()?.into_path()).await
}

/// Creates a test environment with the [`RocksDBMetadataStore`] and a [`MetadataStoreClient`]
/// connected to it.
async fn create_test_environment_with_path(
rocksdb_path: impl AsRef<Path>,
async fn create_test_environment(
opts: &MetadataStoreOptions,
) -> anyhow::Result<(MetadataStoreClient, TestCoreEnv<MockNetworkSender>)> {
let env = TestCoreEnvBuilder::new_with_mock_network().build().await;

Expand All @@ -290,18 +292,18 @@ async fn create_test_environment_with_path(
RocksDbManager::init(Constant::new(CommonOptions::default()))
});

let client = start_metadata_store(rocksdb_path, task_center).await?;
let client =
start_metadata_store(opts, Constant::new(opts.rocksdb.clone()), task_center).await?;

Ok((client, env))
}

async fn start_metadata_store(
rocksdb_path: impl AsRef<Path> + Sized,
opts: &MetadataStoreOptions,
updateables_rocksdb_options: impl Updateable<RocksDbOptions> + Send + Sync + Clone + 'static,
task_center: &TaskCenter,
) -> anyhow::Result<MetadataStoreClient> {
let store = LocalMetadataStore::new(rocksdb_path, 32, || {
Configuration::mapped_updateable(|config| &config.metadata_store.rocksdb)
})?;
let store = LocalMetadataStore::new(opts, updateables_rocksdb_options)?;

let uds_path = tempfile::tempdir()?.into_path().join("grpc-server");
let bind_address = BindAddress::Uds(uds_path.clone());
Expand Down
8 changes: 3 additions & 5 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,9 @@ impl Node {
let metadata_store_role = if config.has_role(Role::MetadataStore) {
Some(LocalMetadataStoreService::from_options(
&config.metadata_store,
|| {
updateable_config
.clone()
.map_as_updateable_owned(|config| &config.metadata_store.rocksdb)
},
updateable_config
.clone()
.map_as_updateable_owned(|config| &config.metadata_store.rocksdb),
)?)
} else {
None
Expand Down
11 changes: 6 additions & 5 deletions crates/types/src/arc_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ where
}

/// Make it possible to create an Updateable of a fixed arc value.
#[derive(Clone)]
pub struct Constant<T>(Arc<T>);

impl<T> Constant<T> {
Expand Down Expand Up @@ -128,14 +129,14 @@ pub trait ArcSwapExt<T> {
where
F: FnMut(&Arc<T>) -> &U;

fn map_as_updateable_owned<F, U>(self, f: F) -> impl Updateable<U>
fn map_as_updateable_owned<F, U>(self, f: F) -> impl Updateable<U> + Clone
where
F: FnMut(&Arc<T>) -> &U;
F: FnMut(&Arc<T>) -> &U + Clone;
}

impl<K, T> ArcSwapExt<T> for K
where
K: Deref<Target = ArcSwapAny<Arc<T>>>,
K: Deref<Target = ArcSwapAny<Arc<T>>> + Clone,
T: 'static,
{
fn pinned(&self) -> Pinned<T> {
Expand All @@ -158,9 +159,9 @@ where
cached.map(f)
}

fn map_as_updateable_owned<F, U>(self, f: F) -> impl Updateable<U>
fn map_as_updateable_owned<F, U>(self, f: F) -> impl Updateable<U> + Clone
where
F: FnMut(&Arc<T>) -> &U,
F: FnMut(&Arc<T>) -> &U + Clone,
{
let cached = Cache::new(self);
cached.map(f)
Expand Down
7 changes: 6 additions & 1 deletion crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use serde::{Deserialize, Serialize};
use serde_with::serde_as;

use restate_serde_util::NonZeroByteCount;
use tracing::warn;

use crate::logs::metadata::ProviderKind;

Expand Down Expand Up @@ -98,7 +99,11 @@ impl LocalLogletOptions {

pub fn rocksdb_memory_budget(&self) -> usize {
self.rocksdb_memory_budget
.expect("rocksdb_memory_budget is set from common")
.unwrap_or_else(|| {
warn!("LocalLoglet rocksdb_memory_budget is not set, defaulting to 1MB");
// 1MB minimum
NonZeroUsize::new(1024 * 1024).unwrap()
})
.get()
}

Expand Down
Loading

0 comments on commit 7c0db24

Please sign in to comment.