From 8fb14a9bcc104a64c7de078863ebbaedb9089596 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 29 Apr 2024 10:21:42 +0100 Subject: [PATCH] Introducing per-partition PartitionStore **IMPORTANT:** This breaks queries through datafusion until we workout how data fusion will shard queries across partitions. --- Cargo.lock | 6 +- Cargo.toml | 2 +- crates/rocksdb/src/error.rs | 3 + crates/rocksdb/src/rock_access.rs | 4 +- .../storage-query-datafusion/src/context.rs | 4 +- .../src/inbox/table.rs | 6 +- .../src/invocation_status/table.rs | 6 +- .../src/journal/table.rs | 6 +- .../src/keyed_service_status/table.rs | 6 +- crates/storage-query-datafusion/src/mocks.rs | 25 +- .../src/state/table.rs | 6 +- crates/storage-rocksdb/Cargo.toml | 4 +- .../benches/basic_benchmark.rs | 43 +- .../src/deduplication_table/mod.rs | 4 +- crates/storage-rocksdb/src/fsm_table/mod.rs | 4 +- .../src/idempotency_table/mod.rs | 4 +- .../src/invocation_status_table/mod.rs | 6 +- .../storage-rocksdb/src/journal_table/mod.rs | 6 +- crates/storage-rocksdb/src/lib.rs | 709 +----------------- .../storage-rocksdb/src/outbox_table/mod.rs | 4 +- crates/storage-rocksdb/src/partition_store.rs | 698 +++++++++++++++++ .../src/partition_store_manager.rs | 174 +++++ .../src/service_status_table/mod.rs | 6 +- crates/storage-rocksdb/src/state_table/mod.rs | 6 +- crates/storage-rocksdb/src/timer_table/mod.rs | 4 +- crates/storage-rocksdb/src/writer.rs | 2 +- .../tests/inbox_table_test/mod.rs | 4 +- .../storage-rocksdb/tests/integration_test.rs | 24 +- .../tests/invocation_status_table_test/mod.rs | 4 +- .../tests/journal_table_test/mod.rs | 4 +- .../tests/outbox_table_test/mod.rs | 4 +- .../tests/state_table_test/mod.rs | 4 +- .../tests/timer_table_test/mod.rs | 4 +- .../virtual_object_status_table_test/mod.rs | 4 +- crates/types/src/config/worker.rs | 6 - crates/worker/Cargo.toml | 1 + crates/worker/src/lib.rs | 29 +- crates/worker/src/partition/leadership/mod.rs | 6 +- crates/worker/src/partition/mod.rs | 20 +- .../worker/src/partition/state_machine/mod.rs | 20 +- crates/worker/src/partition/storage/mod.rs | 1 + .../worker/src/partition_processor_manager.rs | 50 +- 42 files changed, 1088 insertions(+), 845 deletions(-) create mode 100644 crates/storage-rocksdb/src/partition_store.rs create mode 100644 crates/storage-rocksdb/src/partition_store_manager.rs diff --git a/Cargo.lock b/Cargo.lock index dc3cc89f9..9ff04a76f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3612,7 +3612,7 @@ dependencies = [ [[package]] name = "librocksdb-sys" version = "0.17.0+9.0.0" -source = "git+/~https://github.com/restatedev/rust-rocksdb?branch=next#c2181f2b5da6d7bc201dc858433ed9e1c4bba4b7" +source = "git+/~https://github.com/restatedev/rust-rocksdb?rev=c2181f2b5da6d7bc201dc858433ed9e1c4bba4b7#c2181f2b5da6d7bc201dc858433ed9e1c4bba4b7" dependencies = [ "bindgen", "bzip2-sys", @@ -6107,6 +6107,7 @@ dependencies = [ "derive_builder", "derive_more", "drain", + "enum-map", "futures", "futures-util", "googletest", @@ -6133,6 +6134,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tracing", ] [[package]] @@ -6388,7 +6390,7 @@ dependencies = [ [[package]] name = "rocksdb" version = "0.22.0" -source = "git+/~https://github.com/restatedev/rust-rocksdb?branch=next#c2181f2b5da6d7bc201dc858433ed9e1c4bba4b7" +source = "git+/~https://github.com/restatedev/rust-rocksdb?rev=c2181f2b5da6d7bc201dc858433ed9e1c4bba4b7#c2181f2b5da6d7bc201dc858433ed9e1c4bba4b7" dependencies = [ "libc", "librocksdb-sys", diff --git a/Cargo.toml b/Cargo.toml index 3edc9853c..f2fdbc775 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ prost-build = "0.12.1" prost-types = "0.12.1" rand = "0.8.5" rayon = { version = "1.10" } -rocksdb = { version = "0.22.0", features = ["multi-threaded-cf"], git = "/~https://github.com/restatedev/rust-rocksdb", branch="next" } +rocksdb = { version = "0.22.0", features = ["multi-threaded-cf"], git = "/~https://github.com/restatedev/rust-rocksdb", rev="c2181f2b5da6d7bc201dc858433ed9e1c4bba4b7" } rustls = "0.21.6" schemars = { version = "0.8", features = ["bytes", "enumset"] } serde = { version = "1.0", features = ["derive"] } diff --git a/crates/rocksdb/src/error.rs b/crates/rocksdb/src/error.rs index eb399d576..ff3d4bb1a 100644 --- a/crates/rocksdb/src/error.rs +++ b/crates/rocksdb/src/error.rs @@ -24,6 +24,9 @@ pub enum RocksError { #[error("unknown column family: {0}")] #[code(unknown)] UnknownColumnFamily(CfName), + #[error("already open")] + #[code(unknown)] + AlreadyOpen, #[error(transparent)] #[code(unknown)] Other(#[from] rocksdb::Error), diff --git a/crates/rocksdb/src/rock_access.rs b/crates/rocksdb/src/rock_access.rs index ee23be51c..8bd6601cc 100644 --- a/crates/rocksdb/src/rock_access.rs +++ b/crates/rocksdb/src/rock_access.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use rocksdb::perf::MemoryUsageBuilder; use rocksdb::ColumnFamilyDescriptor; use rocksdb::MultiThreaded; +use tracing::info; use tracing::trace; use crate::BoxedCfMatcher; @@ -157,7 +158,7 @@ impl RocksAccess for rocksdb::DB { cf_patterns: Arc<[(BoxedCfMatcher, BoxedCfOptionUpdater)]>, ) -> Result<(), RocksError> { let options = prepare_cf_options(&cf_patterns, default_cf_options, &name)?; - Ok(rocksdb::DB::create_cf(self, name.as_str(), &options)?) + Ok(Self::create_cf(self, name.as_str(), &options)?) } fn flush_memtables(&self, cfs: &[CfName], wait: bool) -> Result<(), RocksError> { @@ -272,6 +273,7 @@ impl RocksAccess for rocksdb::OptimisticTransactionDB { cf_patterns: Arc<[(BoxedCfMatcher, BoxedCfOptionUpdater)]>, ) -> Result<(), RocksError> { let options = prepare_cf_options(&cf_patterns, default_cf_options, &name)?; + info!("Opening CF: {}", name); Ok(Self::create_cf(self, name.as_str(), &options)?) } diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index 7abb74f77..837c63a2d 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -21,7 +21,7 @@ use datafusion::prelude::{SessionConfig, SessionContext}; use restate_invoker_api::StatusHandle; use restate_schema_api::deployment::DeploymentResolver; use restate_schema_api::service::ServiceMetadataResolver; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::config::QueryEngineOptions; use crate::{analyzer, physical_optimizer}; @@ -86,7 +86,7 @@ impl Default for QueryContext { impl QueryContext { pub fn from_options( options: &QueryEngineOptions, - rocksdb: RocksDBStorage, + rocksdb: PartitionStore, status: impl StatusHandle + Send + Sync + Debug + Clone + 'static, schemas: impl DeploymentResolver + ServiceMetadataResolver diff --git a/crates/storage-query-datafusion/src/inbox/table.rs b/crates/storage-query-datafusion/src/inbox/table.rs index 882ec1c77..727ddc2fc 100644 --- a/crates/storage-query-datafusion/src/inbox/table.rs +++ b/crates/storage-query-datafusion/src/inbox/table.rs @@ -25,13 +25,13 @@ pub use datafusion_expr::UserDefinedLogicalNode; use futures::{Stream, StreamExt}; use restate_storage_api::inbox_table::{InboxTable, SequenceNumberInboxEntry}; use restate_storage_api::StorageError; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::PartitionKey; use tokio::sync::mpsc::Sender; pub(crate) fn register_self( ctx: &QueryContext, - storage: RocksDBStorage, + storage: PartitionStore, ) -> datafusion::common::Result<()> { let table = GenericTableProvider::new(InboxBuilder::schema(), Arc::new(InboxScanner(storage))); @@ -41,7 +41,7 @@ pub(crate) fn register_self( } #[derive(Debug, Clone)] -struct InboxScanner(RocksDBStorage); +struct InboxScanner(PartitionStore); impl RangeScanner for InboxScanner { fn scan( diff --git a/crates/storage-query-datafusion/src/invocation_status/table.rs b/crates/storage-query-datafusion/src/invocation_status/table.rs index 64ee00db8..08e4110c6 100644 --- a/crates/storage-query-datafusion/src/invocation_status/table.rs +++ b/crates/storage-query-datafusion/src/invocation_status/table.rs @@ -23,13 +23,13 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; pub use datafusion_expr::UserDefinedLogicalNode; use restate_storage_rocksdb::invocation_status_table::OwnedInvocationStatusRow; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::PartitionKey; use tokio::sync::mpsc::Sender; pub(crate) fn register_self( ctx: &QueryContext, - storage: RocksDBStorage, + storage: PartitionStore, ) -> datafusion::common::Result<()> { let status_table = GenericTableProvider::new( InvocationStatusBuilder::schema(), @@ -42,7 +42,7 @@ pub(crate) fn register_self( } #[derive(Debug, Clone)] -struct StatusScanner(RocksDBStorage); +struct StatusScanner(PartitionStore); impl RangeScanner for StatusScanner { fn scan( diff --git a/crates/storage-query-datafusion/src/journal/table.rs b/crates/storage-query-datafusion/src/journal/table.rs index a80bdb191..71d9217a4 100644 --- a/crates/storage-query-datafusion/src/journal/table.rs +++ b/crates/storage-query-datafusion/src/journal/table.rs @@ -23,13 +23,13 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; pub use datafusion_expr::UserDefinedLogicalNode; use restate_storage_rocksdb::journal_table::OwnedJournalRow; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::PartitionKey; use tokio::sync::mpsc::Sender; pub(crate) fn register_self( ctx: &QueryContext, - storage: RocksDBStorage, + storage: PartitionStore, ) -> datafusion::common::Result<()> { let journal_table = GenericTableProvider::new(JournalBuilder::schema(), Arc::new(JournalScanner(storage))); @@ -40,7 +40,7 @@ pub(crate) fn register_self( } #[derive(Debug, Clone)] -struct JournalScanner(RocksDBStorage); +struct JournalScanner(PartitionStore); impl RangeScanner for JournalScanner { fn scan( diff --git a/crates/storage-query-datafusion/src/keyed_service_status/table.rs b/crates/storage-query-datafusion/src/keyed_service_status/table.rs index fc9ddd1b1..66b50d56b 100644 --- a/crates/storage-query-datafusion/src/keyed_service_status/table.rs +++ b/crates/storage-query-datafusion/src/keyed_service_status/table.rs @@ -23,13 +23,13 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; pub use datafusion_expr::UserDefinedLogicalNode; use restate_storage_rocksdb::service_status_table::OwnedVirtualObjectStatusRow; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::PartitionKey; use tokio::sync::mpsc::Sender; pub(crate) fn register_self( ctx: &QueryContext, - storage: RocksDBStorage, + storage: PartitionStore, ) -> datafusion::common::Result<()> { let status_table = GenericTableProvider::new( KeyedServiceStatusBuilder::schema(), @@ -42,7 +42,7 @@ pub(crate) fn register_self( } #[derive(Debug, Clone)] -struct VirtualObjectStatusScanner(RocksDBStorage); +struct VirtualObjectStatusScanner(PartitionStore); impl RangeScanner for VirtualObjectStatusScanner { fn scan( diff --git a/crates/storage-query-datafusion/src/mocks.rs b/crates/storage-query-datafusion/src/mocks.rs index d1b495471..319482971 100644 --- a/crates/storage-query-datafusion/src/mocks.rs +++ b/crates/storage-query-datafusion/src/mocks.rs @@ -22,13 +22,14 @@ use restate_schema_api::deployment::mocks::MockDeploymentMetadataRegistry; use restate_schema_api::deployment::{Deployment, DeploymentResolver}; use restate_schema_api::service::mocks::MockServiceMetadataResolver; use restate_schema_api::service::{ServiceMetadata, ServiceMetadataResolver}; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_types::arc_util::Constant; use restate_types::config::{CommonOptions, QueryEngineOptions, WorkerOptions}; -use restate_types::identifiers::{DeploymentId, ServiceRevision}; +use restate_types::identifiers::{DeploymentId, PartitionKey, ServiceRevision}; use restate_types::invocation::ServiceType; use std::fmt::Debug; use std::marker::PhantomData; +use std::ops::RangeInclusive; #[derive(Default, Clone, Debug)] pub(crate) struct MockSchemas( @@ -74,7 +75,7 @@ impl DeploymentResolver for MockSchemas { } } -pub(crate) struct MockQueryEngine(RocksDBStorage, QueryContext); +pub(crate) struct MockQueryEngine(PartitionStore, QueryContext); impl MockQueryEngine { pub async fn create_with( @@ -92,12 +93,22 @@ impl MockQueryEngine { RocksDbManager::init(Constant::new(CommonOptions::default())) }); let worker_options = WorkerOptions::default(); - let rocksdb = RocksDBStorage::open( + let manager = PartitionStoreManager::create( Constant::new(worker_options.storage.clone()), - Constant::new(worker_options.storage.rocksdb), + Constant::new(worker_options.storage.rocksdb.clone()), + &[(0, RangeInclusive::new(0, PartitionKey::MAX))], ) .await - .expect("RocksDB storage creation should succeed"); + .expect("DB creation succeeds"); + let rocksdb = manager + .open_partition_store( + 0, + RangeInclusive::new(0, PartitionKey::MAX), + OpenMode::CreateIfMissing, + &worker_options.storage.rocksdb, + ) + .await + .expect("column family is open"); Self( rocksdb.clone(), @@ -110,7 +121,7 @@ impl MockQueryEngine { Self::create_with(MockStatusHandle::default(), MockSchemas::default()).await } - pub fn rocksdb_mut(&mut self) -> &mut RocksDBStorage { + pub fn rocksdb_mut(&mut self) -> &mut PartitionStore { &mut self.0 } diff --git a/crates/storage-query-datafusion/src/state/table.rs b/crates/storage-query-datafusion/src/state/table.rs index c87f289eb..3f95156f2 100644 --- a/crates/storage-query-datafusion/src/state/table.rs +++ b/crates/storage-query-datafusion/src/state/table.rs @@ -23,13 +23,13 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; pub use datafusion_expr::UserDefinedLogicalNode; use restate_storage_rocksdb::state_table::OwnedStateRow; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::PartitionKey; use tokio::sync::mpsc::Sender; pub(crate) fn register_self( ctx: &QueryContext, - storage: RocksDBStorage, + storage: PartitionStore, ) -> datafusion::common::Result<()> { let table = GenericTableProvider::new(StateBuilder::schema(), Arc::new(StateScanner(storage))); @@ -39,7 +39,7 @@ pub(crate) fn register_self( } #[derive(Debug, Clone)] -struct StateScanner(RocksDBStorage); +struct StateScanner(PartitionStore); impl RangeScanner for StateScanner { fn scan( diff --git a/crates/storage-rocksdb/Cargo.toml b/crates/storage-rocksdb/Cargo.toml index 425aeeb0f..8ad2e7dbf 100644 --- a/crates/storage-rocksdb/Cargo.toml +++ b/crates/storage-rocksdb/Cargo.toml @@ -25,9 +25,10 @@ codederror = { workspace = true } derive_builder = { workspace = true } derive_more = { workspace = true } drain = { workspace = true } +enum-map = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } -once_cell = "1.18.0" +once_cell = { workspace = true } paste = { workspace = true } prost = { workspace = true } rocksdb = { workspace = true } @@ -40,6 +41,7 @@ sync_wrapper = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } +tracing = { workspace = true } [dev-dependencies] restate-core = { workspace = true, features = ["test-util"] } diff --git a/crates/storage-rocksdb/benches/basic_benchmark.rs b/crates/storage-rocksdb/benches/basic_benchmark.rs index bf00eac4c..c55d20ec5 100644 --- a/crates/storage-rocksdb/benches/basic_benchmark.rs +++ b/crates/storage-rocksdb/benches/basic_benchmark.rs @@ -8,6 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::ops::RangeInclusive; + use criterion::{criterion_group, criterion_main, Criterion}; use restate_core::TaskCenterBuilder; use restate_rocksdb::RocksDbManager; @@ -15,22 +17,13 @@ use restate_storage_api::deduplication_table::{ DedupSequenceNumber, DeduplicationTable, ProducerId, }; use restate_storage_api::Transaction; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_types::arc_util::Constant; use restate_types::config::{CommonOptions, WorkerOptions}; +use restate_types::identifiers::PartitionKey; use tokio::runtime::Builder; -async fn writing_to_rocksdb(worker_options: WorkerOptions) { - // - // setup - // - let mut rocksdb = RocksDBStorage::open( - Constant::new(worker_options.storage.clone()), - Constant::new(worker_options.storage.rocksdb), - ) - .await - .expect("RocksDB storage creation should succeed"); - +async fn writing_to_rocksdb(mut rocksdb: PartitionStore) { // // write // @@ -52,16 +45,38 @@ fn basic_writing_reading_benchmark(c: &mut Criterion) { .build() .expect("task_center builds"); + let worker_options = WorkerOptions::default(); tc.run_in_scope_sync("db-manager-init", None, || { RocksDbManager::init(Constant::new(CommonOptions::default())) }); + let rocksdb = tc.block_on("test-setup", None, async { + // + // setup + // + let manager = PartitionStoreManager::create( + Constant::new(worker_options.storage.clone()), + Constant::new(worker_options.storage.rocksdb.clone()), + &[(0, RangeInclusive::new(0, PartitionKey::MAX))], + ) + .await + .expect("DB creation succeeds"); + manager + .open_partition_store( + 0, + RangeInclusive::new(0, PartitionKey::MAX), + OpenMode::CreateIfMissing, + &worker_options.storage.rocksdb, + ) + .await + .expect("column family is open") + }); + let mut group = c.benchmark_group("RocksDB"); group.sample_size(10).bench_function("writing", |bencher| { // This will generate a temp dir since we have test-util feature enabled - let worker_options = WorkerOptions::default(); bencher .to_async(&rt) - .iter(|| writing_to_rocksdb(worker_options.clone())); + .iter(|| writing_to_rocksdb(rocksdb.clone())); }); group.finish(); diff --git a/crates/storage-rocksdb/src/deduplication_table/mod.rs b/crates/storage-rocksdb/src/deduplication_table/mod.rs index c5988df86..95cb1fdc0 100644 --- a/crates/storage-rocksdb/src/deduplication_table/mod.rs +++ b/crates/storage-rocksdb/src/deduplication_table/mod.rs @@ -11,7 +11,7 @@ use crate::keys::{define_table_key, KeyKind, TableKey}; use crate::TableKind::Deduplication; use crate::{ - RocksDBStorage, RocksDBTransaction, StorageAccess, TableScan, TableScanIterationDecision, + PartitionStore, RocksDBTransaction, StorageAccess, TableScan, TableScanIterationDecision, }; use futures::Stream; use futures_util::stream; @@ -67,7 +67,7 @@ fn get_all_sequence_numbers( )) } -impl ReadOnlyDeduplicationTable for RocksDBStorage { +impl ReadOnlyDeduplicationTable for PartitionStore { async fn get_dedup_sequence_number( &mut self, partition_id: PartitionId, diff --git a/crates/storage-rocksdb/src/fsm_table/mod.rs b/crates/storage-rocksdb/src/fsm_table/mod.rs index d72f2d189..bca6a13f5 100644 --- a/crates/storage-rocksdb/src/fsm_table/mod.rs +++ b/crates/storage-rocksdb/src/fsm_table/mod.rs @@ -10,7 +10,7 @@ use crate::keys::{define_table_key, KeyKind}; use crate::TableKind::PartitionStateMachine; -use crate::{RocksDBStorage, RocksDBTransaction, StorageAccess}; +use crate::{PartitionStore, RocksDBTransaction, StorageAccess}; use restate_storage_api::fsm_table::{FsmTable, ReadOnlyFsmTable}; use restate_storage_api::Result; use restate_types::identifiers::PartitionId; @@ -54,7 +54,7 @@ fn clear(storage: &mut S, partition_id: PartitionId, state_id: storage.delete_key(&key); } -impl ReadOnlyFsmTable for RocksDBStorage { +impl ReadOnlyFsmTable for PartitionStore { async fn get(&mut self, partition_id: PartitionId, state_id: u64) -> Result> where T: StorageDecode, diff --git a/crates/storage-rocksdb/src/idempotency_table/mod.rs b/crates/storage-rocksdb/src/idempotency_table/mod.rs index fcf996f62..9278dbb16 100644 --- a/crates/storage-rocksdb/src/idempotency_table/mod.rs +++ b/crates/storage-rocksdb/src/idempotency_table/mod.rs @@ -11,7 +11,7 @@ use crate::keys::{define_table_key, KeyKind, TableKey}; use crate::owned_iter::OwnedIterator; use crate::scan::TableScan; -use crate::{RocksDBStorage, TableKind}; +use crate::{PartitionStore, TableKind}; use crate::{RocksDBTransaction, StorageAccess}; use bytes::Bytes; use bytestring::ByteString; @@ -96,7 +96,7 @@ fn delete_idempotency_metadata(storage: &mut S, idempotency_id storage.delete_key(&key); } -impl ReadOnlyIdempotencyTable for RocksDBStorage { +impl ReadOnlyIdempotencyTable for PartitionStore { async fn get_idempotency_metadata( &mut self, idempotency_id: &IdempotencyId, diff --git a/crates/storage-rocksdb/src/invocation_status_table/mod.rs b/crates/storage-rocksdb/src/invocation_status_table/mod.rs index 0c6b9675d..c0e92753e 100644 --- a/crates/storage-rocksdb/src/invocation_status_table/mod.rs +++ b/crates/storage-rocksdb/src/invocation_status_table/mod.rs @@ -11,7 +11,7 @@ use crate::keys::{define_table_key, KeyKind, TableKey}; use crate::owned_iter::OwnedIterator; use crate::TableScan::FullScanPartitionKeyRange; -use crate::{RocksDBStorage, TableKind, TableScanIterationDecision}; +use crate::{PartitionStore, TableKind, TableScanIterationDecision}; use crate::{RocksDBTransaction, StorageAccess}; use futures::Stream; use futures_util::stream; @@ -113,7 +113,7 @@ fn read_invoked_full_invocation_id( } } -impl ReadOnlyInvocationStatusTable for RocksDBStorage { +impl ReadOnlyInvocationStatusTable for PartitionStore { async fn get_invocation_status( &mut self, invocation_id: &InvocationId, @@ -167,7 +167,7 @@ pub struct OwnedInvocationStatusRow { pub invocation_status: InvocationStatus, } -impl RocksDBStorage { +impl PartitionStore { pub fn all_invocation_status( &self, range: RangeInclusive, diff --git a/crates/storage-rocksdb/src/journal_table/mod.rs b/crates/storage-rocksdb/src/journal_table/mod.rs index 1beef0f50..cca28453e 100644 --- a/crates/storage-rocksdb/src/journal_table/mod.rs +++ b/crates/storage-rocksdb/src/journal_table/mod.rs @@ -13,7 +13,7 @@ use crate::keys::{define_table_key, KeyKind}; use crate::owned_iter::OwnedIterator; use crate::scan::TableScan::FullScanPartitionKeyRange; use crate::TableKind::Journal; -use crate::{RocksDBStorage, RocksDBTransaction, StorageAccess}; +use crate::{PartitionStore, RocksDBTransaction, StorageAccess}; use crate::{TableScan, TableScanIterationDecision}; use futures::Stream; use futures_util::stream; @@ -110,7 +110,7 @@ fn delete_journal( } } -impl ReadOnlyJournalTable for RocksDBStorage { +impl ReadOnlyJournalTable for PartitionStore { async fn get_journal_entry( &mut self, invocation_id: &InvocationId, @@ -168,7 +168,7 @@ pub struct OwnedJournalRow { pub journal_entry: JournalEntry, } -impl RocksDBStorage { +impl PartitionStore { pub fn all_journal( &self, range: RangeInclusive, diff --git a/crates/storage-rocksdb/src/lib.rs b/crates/storage-rocksdb/src/lib.rs index 11cd48fbd..12855693a 100644 --- a/crates/storage-rocksdb/src/lib.rs +++ b/crates/storage-rocksdb/src/lib.rs @@ -17,713 +17,14 @@ pub mod journal_table; pub mod keys; pub mod outbox_table; mod owned_iter; +mod partition_store; +mod partition_store_manager; pub mod scan; pub mod service_status_table; pub mod state_table; pub mod timer_table; -use crate::keys::TableKey; -use crate::scan::{PhysicalScan, TableScan}; -use crate::TableKind::{ - Deduplication, Idempotency, Inbox, InvocationStatus, Journal, Outbox, PartitionStateMachine, - ServiceStatus, State, Timers, -}; +pub use partition_store::*; +pub use partition_store_manager::*; -use std::sync::Arc; - -use bytes::{Bytes, BytesMut}; -use codederror::CodedError; -use rocksdb::DBCompressionType; -use rocksdb::DBPinnableSlice; -use rocksdb::DBRawIteratorWithThreadMode; -use rocksdb::MultiThreaded; -use rocksdb::PrefixRange; -use rocksdb::ReadOptions; -use rocksdb::{BoundColumnFamily, SliceTransform}; -use static_assertions::const_assert_eq; - -use restate_core::ShutdownError; -use restate_rocksdb::{ - CfName, CfPrefixPattern, DbName, DbSpecBuilder, Owner, RocksDbManager, RocksError, -}; -use restate_storage_api::{Storage, StorageError, Transaction}; -use restate_types::arc_util::Updateable; -use restate_types::config::{RocksDbOptions, StorageOptions}; -use restate_types::identifiers::{PartitionId, PartitionKey}; -use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; - -use self::keys::KeyKind; - -pub type DB = rocksdb::OptimisticTransactionDB; -type TransactionDB<'a> = rocksdb::Transaction<'a, DB>; - -pub type DBIterator<'b> = DBRawIteratorWithThreadMode<'b, DB>; -pub type DBIteratorTransaction<'b> = DBRawIteratorWithThreadMode<'b, rocksdb::Transaction<'b, DB>>; - -// matches the default directory name -const DB_NAME: &str = "db"; - -pub const PARTITION_CF: &str = "data-unpartitioned"; - -//Key prefix is 10 bytes (KeyKind(2) + PartitionKey/Id(8)) -const DB_PREFIX_LENGTH: usize = KeyKind::SERIALIZED_LENGTH + std::mem::size_of::(); - -// If this changes, we need to know. -const_assert_eq!(DB_PREFIX_LENGTH, 10); - -// Ensures that both types have the same length, this makes it possible to -// share prefix extractor in rocksdb. -const_assert_eq!( - std::mem::size_of::(), - std::mem::size_of::(), -); - -pub(crate) type Result = std::result::Result; - -pub enum TableScanIterationDecision { - Emit(Result), - Continue, - Break, - BreakWith(Result), -} - -#[inline] -const fn cf_name(_kind: TableKind) -> &'static str { - PARTITION_CF -} - -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum TableKind { - // By Partition ID - PartitionStateMachine, - Deduplication, - Outbox, - Timers, - // By Partition Key - State, - InvocationStatus, - ServiceStatus, - Idempotency, - Inbox, - Journal, -} - -impl TableKind { - pub fn all() -> core::slice::Iter<'static, TableKind> { - static VARIANTS: &[TableKind] = &[ - State, - InvocationStatus, - ServiceStatus, - Idempotency, - Inbox, - Outbox, - Deduplication, - PartitionStateMachine, - Timers, - Journal, - ]; - VARIANTS.iter() - } - - pub const fn key_kinds(self) -> &'static [KeyKind] { - match self { - State => &[KeyKind::State], - InvocationStatus => &[KeyKind::InvocationStatus], - ServiceStatus => &[KeyKind::ServiceStatus], - Idempotency => &[KeyKind::Idempotency], - Inbox => &[KeyKind::Inbox], - Outbox => &[KeyKind::Outbox], - Deduplication => &[KeyKind::Deduplication], - PartitionStateMachine => &[KeyKind::Fsm], - Timers => &[KeyKind::Timers], - Journal => &[KeyKind::Journal], - } - } - - pub fn has_key_kind(self, prefix: &[u8]) -> bool { - self.extract_key_kind(prefix).is_some() - } - - pub fn extract_key_kind(self, prefix: &[u8]) -> Option { - if prefix.len() < KeyKind::SERIALIZED_LENGTH { - return None; - } - let slice = prefix[..KeyKind::SERIALIZED_LENGTH].try_into().unwrap(); - let Some(kind) = KeyKind::from_bytes(slice) else { - // warning - return None; - }; - self.key_kinds().iter().find(|k| **k == kind).copied() - } -} - -#[derive(Debug, thiserror::Error, CodedError)] -pub enum BuildError { - #[error(transparent)] - RocksDbManager( - #[from] - #[code] - RocksError, - ), - #[error("db contains no storage format version")] - #[code(restate_errors::RT0009)] - MissingStorageFormatVersion, - #[error(transparent)] - #[code(unknown)] - Other(#[from] rocksdb::Error), - #[error(transparent)] - #[code(unknown)] - Shutdown(#[from] ShutdownError), -} - -pub struct RocksDBStorage { - db: Arc, - key_buffer: BytesMut, - value_buffer: BytesMut, -} - -impl std::fmt::Debug for RocksDBStorage { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RocksDBStorage") - .field("db", &self.db) - .field("key_buffer", &self.key_buffer) - .field("value_buffer", &self.value_buffer) - .finish() - } -} - -impl Clone for RocksDBStorage { - fn clone(&self) -> Self { - RocksDBStorage { - db: self.db.clone(), - key_buffer: BytesMut::default(), - value_buffer: BytesMut::default(), - } - } -} - -fn db_options() -> rocksdb::Options { - let mut db_options = rocksdb::Options::default(); - // no need to retain 1000 log files by default. - // - db_options.set_keep_log_file_num(1); - - // we always need to enable atomic flush in case that the user disables wal at runtime - db_options.set_atomic_flush(true); - - // we always enable manual wal flushing in case that the user enables wal at runtime - db_options.set_manual_wal_flush(true); - - db_options -} - -fn cf_options(mut cf_options: rocksdb::Options) -> rocksdb::Options { - // Actually, we would love to use CappedPrefixExtractor but unfortunately it's neither exposed - // in the C API nor the rust binding. That's okay and we can change it later. - cf_options.set_prefix_extractor(SliceTransform::create_fixed_prefix(DB_PREFIX_LENGTH)); - cf_options.set_memtable_prefix_bloom_ratio(0.2); - // Most of the changes are highly temporal, we try to delay flushing - // As much as we can to increase the chances to observe a deletion. - // - cf_options.set_max_write_buffer_number(3); - cf_options.set_min_write_buffer_number_to_merge(2); - // - // Set compactions per level - // - cf_options.set_num_levels(7); - cf_options.set_compression_per_level(&[ - DBCompressionType::None, - DBCompressionType::Snappy, - DBCompressionType::Snappy, - DBCompressionType::Snappy, - DBCompressionType::Snappy, - DBCompressionType::Snappy, - DBCompressionType::Zstd, - ]); - - cf_options -} - -impl RocksDBStorage { - /// Returns the raw rocksdb handle, this should only be used for server operations that - /// require direct access to rocksdb. - pub fn inner(&self) -> Arc { - self.db.clone() - } - - pub async fn open( - mut storage_opts: impl Updateable + Send + 'static, - updateable_opts: impl Updateable + Send + 'static, - ) -> std::result::Result { - let cfs = vec![CfName::new(PARTITION_CF)]; - - let options = storage_opts.load(); - let db_spec = DbSpecBuilder::new( - DbName::new(DB_NAME), - Owner::PartitionProcessor, - options.data_dir(), - db_options(), - ) - // At the moment, all CFs get the same options, that might change in the future. - .add_cf_pattern(CfPrefixPattern::ANY, cf_options) - .ensure_column_families(cfs) - .build_as_optimistic_db(); - - // todo remove this when open_db is async - let rdb = tokio::task::spawn_blocking(move || { - RocksDbManager::get().open_db(updateable_opts, db_spec) - }) - .await - .map_err(|_| ShutdownError)??; - - Ok(Self { - db: rdb, - key_buffer: BytesMut::default(), - value_buffer: BytesMut::default(), - }) - } - - fn table_handle(&self, table_kind: TableKind) -> Arc { - self.db.cf_handle(cf_name(table_kind)).expect( - "This should not happen, this is a Restate bug. Please contact the restate developers.", - ) - } - - fn prefix_iterator(&self, table: TableKind, _key_kind: KeyKind, prefix: Bytes) -> DBIterator { - let table = self.table_handle(table); - let mut opts = ReadOptions::default(); - opts.set_prefix_same_as_start(true); - opts.set_iterate_range(PrefixRange(prefix.clone())); - opts.set_async_io(true); - opts.set_total_order_seek(false); - let mut it = self.db.raw_iterator_cf_opt(&table, opts); - it.seek(prefix); - it - } - - fn range_iterator( - &self, - table: TableKind, - _key: KeyKind, - scan_mode: ScanMode, - from: Bytes, - to: Bytes, - ) -> DBIterator { - let table = self.table_handle(table); - let mut opts = ReadOptions::default(); - // todo: use auto_prefix_mode, at the moment, rocksdb doesn't expose this through the C - // binding. - opts.set_total_order_seek(scan_mode == ScanMode::TotalOrder); - opts.set_iterate_range(from.clone()..to); - opts.set_async_io(true); - - let mut it = self.db.raw_iterator_cf_opt(&table, opts); - it.seek(from); - it - } - - #[track_caller] - fn iterator_from( - &self, - scan: TableScan, - ) -> DBRawIteratorWithThreadMode<'_, DB> { - let scan: PhysicalScan = scan.into(); - match scan { - PhysicalScan::Prefix(table, key_kind, prefix) => { - assert!(table.has_key_kind(&prefix)); - self.prefix_iterator(table, key_kind, prefix.freeze()) - } - PhysicalScan::RangeExclusive(table, key_kind, scan_mode, start, end) => { - assert!(table.has_key_kind(&start)); - self.range_iterator(table, key_kind, scan_mode, start.freeze(), end.freeze()) - } - PhysicalScan::RangeOpen(table, key_kind, start) => { - // We delayed the generate the synthetic iterator upper bound until this point - // because we might have different prefix length requirements based on the - // table+key_kind combination and we should keep this knowledge as low-level as - // possible. - // - // make the end has the same length as all prefixes to ensure rocksdb key - // comparator can leverage bloom filters when applicable - // (if auto_prefix_mode is enabled) - let mut end = BytesMut::zeroed(DB_PREFIX_LENGTH); - // We want to ensure that Range scans fall within the same key kind. - // So, we limit the iterator to the upper bound of this prefix - let kind_upper_bound = K::KEY_KIND.exclusive_upper_bound(); - end[..kind_upper_bound.len()].copy_from_slice(&kind_upper_bound); - self.range_iterator( - table, - key_kind, - ScanMode::TotalOrder, - start.freeze(), - end.freeze(), - ) - } - } - } - - #[allow(clippy::needless_lifetimes)] - pub fn transaction(&mut self) -> RocksDBTransaction { - let db = self.db.clone(); - - RocksDBTransaction { - txn: self.db.transaction(), - db, - key_buffer: &mut self.key_buffer, - value_buffer: &mut self.value_buffer, - } - } -} - -impl Storage for RocksDBStorage { - type TransactionType<'a> = RocksDBTransaction<'a>; - - fn transaction(&mut self) -> Self::TransactionType<'_> { - RocksDBStorage::transaction(self) - } -} - -impl StorageAccess for RocksDBStorage { - type DBAccess<'a> - = DB where - Self: 'a,; - - fn iterator_from( - &self, - scan: TableScan, - ) -> DBRawIteratorWithThreadMode<'_, Self::DBAccess<'_>> { - self.iterator_from(scan) - } - - #[inline] - fn cleared_key_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut { - self.key_buffer.clear(); - self.key_buffer.reserve(min_size); - &mut self.key_buffer - } - - #[inline] - fn cleared_value_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut { - self.value_buffer.clear(); - self.value_buffer.reserve(min_size); - &mut self.value_buffer - } - - #[inline] - fn get>(&self, table: TableKind, key: K) -> Result> { - let table = self.table_handle(table); - self.db - .get_pinned_cf(&table, key) - .map_err(|error| StorageError::Generic(error.into())) - } - - #[inline] - fn put_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) { - let table = self.table_handle(table); - self.db.put_cf(&table, key, value).unwrap(); - } - - #[inline] - fn delete_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>) { - let table = self.table_handle(table); - self.db.delete_cf(&table, key).unwrap(); - } -} - -pub struct RocksDBTransaction<'a> { - txn: rocksdb::Transaction<'a, DB>, - db: Arc, - key_buffer: &'a mut BytesMut, - value_buffer: &'a mut BytesMut, -} - -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum ScanMode { - /// Scan is bound to a single fixed key prefix (partition id, or a single partition key). - WithinPrefix, - /// Scan/iterator requires total order seek, this means that the iterator is not bound to a - /// fixed prefix that matches the column family prefix extractor length. For instance, if - /// scanning data across multiple partition IDs or multiple partition keys. - TotalOrder, -} - -impl<'a> RocksDBTransaction<'a> { - pub(crate) fn prefix_iterator( - &self, - table: TableKind, - _key_kind: KeyKind, - prefix: Bytes, - ) -> DBIteratorTransaction { - let table = self.table_handle(table); - let mut opts = ReadOptions::default(); - opts.set_iterate_range(PrefixRange(prefix.clone())); - opts.set_prefix_same_as_start(true); - opts.set_async_io(true); - opts.set_total_order_seek(false); - - let mut it = self.txn.raw_iterator_cf_opt(&table, opts); - it.seek(prefix); - it - } - - pub(crate) fn range_iterator( - &self, - table: TableKind, - _key_kind: KeyKind, - scan_mode: ScanMode, - from: Bytes, - to: Bytes, - ) -> DBIteratorTransaction { - let table = self.table_handle(table); - let mut opts = ReadOptions::default(); - // todo: use auto_prefix_mode, at the moment, rocksdb doesn't expose this through the C - // binding. - opts.set_total_order_seek(scan_mode == ScanMode::TotalOrder); - opts.set_iterate_range(from.clone()..to); - opts.set_async_io(true); - let mut it = self.txn.raw_iterator_cf_opt(&table, opts); - it.seek(from); - it - } - - pub(crate) fn table_handle(&self, table_kind: TableKind) -> Arc { - self.db.cf_handle(cf_name(table_kind)).expect( - "This should not happen, this is a Restate bug. Please contact the restate developers.", - ) - } -} - -impl<'a> Transaction for RocksDBTransaction<'a> { - async fn commit(self) -> Result<()> { - // We cannot directly commit the txn because it might fail because of unrelated concurrent - // writes to RocksDB. However, it is safe to write the WriteBatch for a given partition, - // because there can only be a single writer (the leading PartitionProcessor). - let write_batch = self.txn.get_writebatch(); - // todo: make async and use configuration to control use of WAL - if write_batch.is_empty() { - return Ok(()); - } - let mut opts = rocksdb::WriteOptions::default(); - // We disable WAL since bifrost is our durable distributed log. - opts.disable_wal(true); - self.db - .write_opt(&write_batch, &rocksdb::WriteOptions::default()) - .map_err(|error| StorageError::Generic(error.into())) - } -} - -impl<'a> StorageAccess for RocksDBTransaction<'a> { - type DBAccess<'b> = TransactionDB<'b> where Self: 'b; - - fn iterator_from( - &self, - scan: TableScan, - ) -> DBRawIteratorWithThreadMode<'_, Self::DBAccess<'_>> { - let scan: PhysicalScan = scan.into(); - match scan { - PhysicalScan::Prefix(table, key_kind, prefix) => { - self.prefix_iterator(table, key_kind, prefix.freeze()) - } - PhysicalScan::RangeExclusive(table, key_kind, scan_mode, start, end) => { - self.range_iterator(table, key_kind, scan_mode, start.freeze(), end.freeze()) - } - PhysicalScan::RangeOpen(table, key_kind, start) => { - // We delayed the generate the synthetic iterator upper bound until this point - // because we might have different prefix length requirements based on the - // table+key_kind combination and we should keep this knowledge as low-level as - // possible. - // - // make the end has the same length as all prefixes to ensure rocksdb key - // comparator can leverage bloom filters when applicable - // (if auto_prefix_mode is enabled) - let mut end = BytesMut::zeroed(DB_PREFIX_LENGTH); - // We want to ensure that Range scans fall within the same key kind. - // So, we limit the iterator to the upper bound of this prefix - let kind_upper_bound = K::KEY_KIND.exclusive_upper_bound(); - end[..kind_upper_bound.len()].copy_from_slice(&kind_upper_bound); - self.range_iterator( - table, - key_kind, - ScanMode::WithinPrefix, - start.freeze(), - end.freeze(), - ) - } - } - } - - #[inline] - fn cleared_key_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut { - self.key_buffer.clear(); - self.key_buffer.reserve(min_size); - self.key_buffer - } - - #[inline] - fn cleared_value_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut { - self.value_buffer.clear(); - self.value_buffer.reserve(min_size); - self.value_buffer - } - - #[inline] - fn get>(&self, table: TableKind, key: K) -> Result> { - let table = self.table_handle(table); - self.txn - .get_pinned_cf(&table, key) - .map_err(|error| StorageError::Generic(error.into())) - } - - #[inline] - fn put_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) { - let table = self.table_handle(table); - self.txn.put_cf(&table, key, value).unwrap(); - } - - #[inline] - fn delete_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>) { - let table = self.table_handle(table); - self.txn.delete_cf(&table, key).unwrap(); - } -} - -trait StorageAccess { - type DBAccess<'a>: rocksdb::DBAccess - where - Self: 'a; - - fn iterator_from( - &self, - scan: TableScan, - ) -> DBRawIteratorWithThreadMode<'_, Self::DBAccess<'_>>; - - fn cleared_key_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut; - - fn cleared_value_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut; - - fn get>(&self, table: TableKind, key: K) -> Result>; - - fn put_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>); - - fn delete_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>); - - #[inline] - fn put_kv_raw>(&mut self, key: K, value: V) { - let key_buffer = self.cleared_key_buffer_mut(key.serialized_length()); - key.serialize_to(key_buffer); - let key_buffer = key_buffer.split(); - - self.put_cf(K::TABLE, key_buffer, value); - } - - #[inline] - fn put_kv(&mut self, key: K, value: V) { - let key_buffer = self.cleared_key_buffer_mut(key.serialized_length()); - key.serialize_to(key_buffer); - let key_buffer = key_buffer.split(); - - let value_buffer = self.cleared_value_buffer_mut(0); - StorageCodec::encode(&value, value_buffer).unwrap(); - let value_buffer = value_buffer.split(); - - self.put_cf(K::TABLE, key_buffer, value_buffer); - } - - #[inline] - fn delete_key(&mut self, key: &K) { - let buffer = self.cleared_key_buffer_mut(key.serialized_length()); - key.serialize_to(buffer); - let buffer = buffer.split(); - - self.delete_cf(K::TABLE, buffer); - } - - #[inline] - fn get_value(&mut self, key: K) -> Result> - where - K: TableKey, - V: StorageDecode, - { - let mut buf = self.cleared_key_buffer_mut(key.serialized_length()); - key.serialize_to(&mut buf); - let buf = buf.split(); - - match self.get(K::TABLE, &buf) { - Ok(value) => { - let slice = value.as_ref().map(|v| v.as_ref()); - - if let Some(mut slice) = slice { - Ok(Some( - StorageCodec::decode::(&mut slice) - .map_err(|err| StorageError::Generic(err.into()))?, - )) - } else { - Ok(None) - } - } - Err(err) => Err(err), - } - } - - #[inline] - fn get_first_blocking(&mut self, scan: TableScan, f: F) -> Result - where - K: TableKey, - F: FnOnce(Option<(&[u8], &[u8])>) -> Result, - { - let iterator = self.iterator_from(scan); - f(iterator.item()) - } - - #[inline] - fn get_kv_raw(&mut self, key: K, f: F) -> Result - where - K: TableKey, - F: FnOnce(&[u8], Option<&[u8]>) -> Result, - { - let mut buf = self.cleared_key_buffer_mut(key.serialized_length()); - key.serialize_to(&mut buf); - let buf = buf.split(); - - match self.get(K::TABLE, &buf) { - Ok(value) => { - let slice = value.as_ref().map(|v| v.as_ref()); - f(&buf, slice) - } - Err(err) => Err(err), - } - } - - #[inline] - fn for_each_key_value_in_place(&self, scan: TableScan, mut op: F) -> Vec> - where - K: TableKey, - F: FnMut(&[u8], &[u8]) -> TableScanIterationDecision, - { - let mut res = Vec::new(); - - let mut iterator = self.iterator_from(scan); - - while let Some((k, v)) = iterator.item() { - match op(k, v) { - TableScanIterationDecision::Emit(result) => { - res.push(result); - iterator.next(); - } - TableScanIterationDecision::BreakWith(result) => { - res.push(result); - break; - } - TableScanIterationDecision::Continue => { - iterator.next(); - continue; - } - TableScanIterationDecision::Break => { - break; - } - }; - } - - res - } -} +use crate::scan::TableScan; diff --git a/crates/storage-rocksdb/src/outbox_table/mod.rs b/crates/storage-rocksdb/src/outbox_table/mod.rs index e36442da4..195995055 100644 --- a/crates/storage-rocksdb/src/outbox_table/mod.rs +++ b/crates/storage-rocksdb/src/outbox_table/mod.rs @@ -10,7 +10,7 @@ use crate::keys::{define_table_key, KeyKind, TableKey}; use crate::TableKind::Outbox; -use crate::{RocksDBStorage, RocksDBTransaction, StorageAccess, TableScan}; +use crate::{PartitionStore, RocksDBTransaction, StorageAccess, TableScan}; use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable}; use restate_storage_api::{Result, StorageError}; @@ -90,7 +90,7 @@ fn truncate_outbox( } } -impl OutboxTable for RocksDBStorage { +impl OutboxTable for PartitionStore { async fn add_message( &mut self, partition_id: PartitionId, diff --git a/crates/storage-rocksdb/src/partition_store.rs b/crates/storage-rocksdb/src/partition_store.rs new file mode 100644 index 000000000..1757f7a1a --- /dev/null +++ b/crates/storage-rocksdb/src/partition_store.rs @@ -0,0 +1,698 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::ops::RangeInclusive; +use std::sync::Arc; + +use bytes::Bytes; +use bytes::BytesMut; +use codederror::CodedError; +use restate_rocksdb::CfName; +use rocksdb::DBCompressionType; +use rocksdb::DBPinnableSlice; +use rocksdb::DBRawIteratorWithThreadMode; +use rocksdb::MultiThreaded; +use rocksdb::PrefixRange; +use rocksdb::ReadOptions; +use rocksdb::{BoundColumnFamily, SliceTransform}; +use static_assertions::const_assert_eq; + +use enum_map::Enum; +use restate_core::ShutdownError; +use restate_rocksdb::{RocksDb, RocksError}; +use restate_storage_api::{Storage, StorageError, Transaction}; + +use restate_types::identifiers::{PartitionId, PartitionKey}; +use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; + +use crate::keys::KeyKind; +use crate::keys::TableKey; +use crate::scan::PhysicalScan; +use crate::scan::TableScan; + +pub type DB = rocksdb::OptimisticTransactionDB; +type TransactionDB<'a> = rocksdb::Transaction<'a, DB>; + +pub type DBIterator<'b> = DBRawIteratorWithThreadMode<'b, DB>; +pub type DBIteratorTransaction<'b> = DBRawIteratorWithThreadMode<'b, rocksdb::Transaction<'b, DB>>; + +// Key prefix is 10 bytes (KeyKind(2) + PartitionKey/Id(8)) +const DB_PREFIX_LENGTH: usize = KeyKind::SERIALIZED_LENGTH + std::mem::size_of::(); + +// If this changes, we need to know. +const_assert_eq!(DB_PREFIX_LENGTH, 10); + +// Ensures that both types have the same length, this makes it possible to +// share prefix extractor in rocksdb. +const_assert_eq!( + std::mem::size_of::(), + std::mem::size_of::(), +); + +pub(crate) type Result = std::result::Result; + +pub enum TableScanIterationDecision { + Emit(Result), + Continue, + Break, + BreakWith(Result), +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Enum, strum_macros::VariantArray)] +pub enum TableKind { + // By Partition ID + PartitionStateMachine, + Deduplication, + Outbox, + Timers, + // By Partition Key + State, + InvocationStatus, + ServiceStatus, + Idempotency, + Inbox, + Journal, +} + +impl TableKind { + pub const fn key_kinds(self) -> &'static [KeyKind] { + match self { + Self::State => &[KeyKind::State], + Self::InvocationStatus => &[KeyKind::InvocationStatus], + Self::ServiceStatus => &[KeyKind::ServiceStatus], + Self::Idempotency => &[KeyKind::Idempotency], + Self::Inbox => &[KeyKind::Inbox], + Self::Outbox => &[KeyKind::Outbox], + Self::Deduplication => &[KeyKind::Deduplication], + Self::PartitionStateMachine => &[KeyKind::Fsm], + Self::Timers => &[KeyKind::Timers], + Self::Journal => &[KeyKind::Journal], + } + } + + pub fn has_key_kind(self, prefix: &[u8]) -> bool { + self.extract_key_kind(prefix).is_some() + } + + pub fn extract_key_kind(self, prefix: &[u8]) -> Option { + if prefix.len() < KeyKind::SERIALIZED_LENGTH { + return None; + } + let slice = prefix[..KeyKind::SERIALIZED_LENGTH].try_into().unwrap(); + let Some(kind) = KeyKind::from_bytes(slice) else { + // warning + return None; + }; + self.key_kinds().iter().find(|k| **k == kind).copied() + } +} + +#[derive(Debug, thiserror::Error, CodedError)] +pub enum BuildError { + #[error(transparent)] + RocksDbManager( + #[from] + #[code] + RocksError, + ), + #[error("db contains no storage format version")] + #[code(restate_errors::RT0009)] + MissingStorageFormatVersion, + #[error(transparent)] + #[code(unknown)] + Other(#[from] rocksdb::Error), + #[error(transparent)] + #[code(unknown)] + Shutdown(#[from] ShutdownError), +} + +pub struct PartitionStore { + raw_db: Arc, + rocksdb: Arc, + partition_id: PartitionId, + data_cf_name: CfName, + key_range: RangeInclusive, + key_buffer: BytesMut, + value_buffer: BytesMut, +} + +impl std::fmt::Debug for PartitionStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PartitionStore") + .field("db", &self.raw_db) + .field("partition_id", &self.partition_id) + .field("cf", &self.data_cf_name) + .field("key_buffer", &self.key_buffer.len()) + .field("value_buffer", &self.value_buffer.len()) + .finish() + } +} + +impl Clone for PartitionStore { + fn clone(&self) -> Self { + PartitionStore { + raw_db: self.raw_db.clone(), + rocksdb: self.rocksdb.clone(), + partition_id: self.partition_id, + data_cf_name: self.data_cf_name.clone(), + key_range: self.key_range.clone(), + key_buffer: BytesMut::default(), + value_buffer: BytesMut::default(), + } + } +} + +pub(crate) fn cf_options(mut cf_options: rocksdb::Options) -> rocksdb::Options { + // Actually, we would love to use CappedPrefixExtractor but unfortunately it's neither exposed + // in the C API nor the rust binding. That's okay and we can change it later. + cf_options.set_prefix_extractor(SliceTransform::create_fixed_prefix(DB_PREFIX_LENGTH)); + cf_options.set_memtable_prefix_bloom_ratio(0.2); + // Most of the changes are highly temporal, we try to delay flushing + // As much as we can to increase the chances to observe a deletion. + // + cf_options.set_max_write_buffer_number(3); + cf_options.set_min_write_buffer_number_to_merge(2); + // + // Set compactions per level + // + cf_options.set_num_levels(7); + cf_options.set_compression_per_level(&[ + DBCompressionType::None, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Zstd, + ]); + + cf_options +} + +impl PartitionStore { + /// Returns the raw rocksdb handle, this should only be used for server operations that + /// require direct access to rocksdb. + pub fn inner(&self) -> Arc { + self.raw_db.clone() + } + + pub(crate) fn new( + raw_db: Arc, + rocksdb: Arc, + data_cf_name: CfName, + partition_id: PartitionId, + key_range: RangeInclusive, + ) -> Self { + Self { + raw_db, + rocksdb, + partition_id, + data_cf_name, + key_range, + key_buffer: BytesMut::new(), + value_buffer: BytesMut::new(), + } + } + + pub fn partition_key_range(&self) -> &RangeInclusive { + &self.key_range + } + + pub fn contains_partition_key(&self, key: PartitionKey) -> bool { + self.key_range.contains(&key) + } + + fn table_handle(&self, table_kind: TableKind) -> Arc { + find_cf_handle(&self.rocksdb, &self.data_cf_name, table_kind) + } + + fn prefix_iterator(&self, table: TableKind, _key_kind: KeyKind, prefix: Bytes) -> DBIterator { + let table = self.table_handle(table); + let mut opts = ReadOptions::default(); + opts.set_prefix_same_as_start(true); + opts.set_iterate_range(PrefixRange(prefix.clone())); + opts.set_async_io(true); + opts.set_total_order_seek(false); + let mut it = self.raw_db.raw_iterator_cf_opt(&table, opts); + it.seek(prefix); + it + } + + fn range_iterator( + &self, + table: TableKind, + _key: KeyKind, + scan_mode: ScanMode, + from: Bytes, + to: Bytes, + ) -> DBIterator { + let table = self.table_handle(table); + let mut opts = ReadOptions::default(); + // todo: use auto_prefix_mode, at the moment, rocksdb doesn't expose this through the C + // binding. + opts.set_total_order_seek(scan_mode == ScanMode::TotalOrder); + opts.set_iterate_range(from.clone()..to); + opts.set_async_io(true); + + let mut it = self.raw_db.raw_iterator_cf_opt(&table, opts); + it.seek(from); + it + } + + #[track_caller] + fn iterator_from( + &self, + scan: TableScan, + ) -> DBRawIteratorWithThreadMode<'_, DB> { + let scan: PhysicalScan = scan.into(); + match scan { + PhysicalScan::Prefix(table, key_kind, prefix) => { + assert!(table.has_key_kind(&prefix)); + self.prefix_iterator(table, key_kind, prefix.freeze()) + } + PhysicalScan::RangeExclusive(table, key_kind, scan_mode, start, end) => { + assert!(table.has_key_kind(&start)); + self.range_iterator(table, key_kind, scan_mode, start.freeze(), end.freeze()) + } + PhysicalScan::RangeOpen(table, key_kind, start) => { + // We delayed the generate the synthetic iterator upper bound until this point + // because we might have different prefix length requirements based on the + // table+key_kind combination and we should keep this knowledge as low-level as + // possible. + // + // make the end has the same length as all prefixes to ensure rocksdb key + // comparator can leverage bloom filters when applicable + // (if auto_prefix_mode is enabled) + let mut end = BytesMut::zeroed(DB_PREFIX_LENGTH); + // We want to ensure that Range scans fall within the same key kind. + // So, we limit the iterator to the upper bound of this prefix + let kind_upper_bound = K::KEY_KIND.exclusive_upper_bound(); + end[..kind_upper_bound.len()].copy_from_slice(&kind_upper_bound); + self.range_iterator( + table, + key_kind, + ScanMode::TotalOrder, + start.freeze(), + end.freeze(), + ) + } + } + } + + #[allow(clippy::needless_lifetimes)] + pub fn transaction(&mut self) -> RocksDBTransaction { + let db = self.raw_db.clone(); + let rocksdb = self.rocksdb.clone(); + // An optimization to avoid looking up the cf handle everytime, if we split into more + // column families, we will need to cache those cfs here as well. + let data_cf_handle = self + .rocksdb + .cf_handle(&self.data_cf_name) + .unwrap_or_else(|| { + panic!( + "Access a column family that must exist: {}", + &self.data_cf_name + ) + }); + + RocksDBTransaction { + txn: self.raw_db.transaction(), + data_cf_handle, + db, + _rocksdb: rocksdb, + key_buffer: &mut self.key_buffer, + value_buffer: &mut self.value_buffer, + } + } +} + +fn find_cf_handle<'a>( + db: &'a Arc, + data_cf_name: &CfName, + _table_kind: TableKind, +) -> Arc> { + // At the moment, everything is in one cf + db.cf_handle(data_cf_name) + .unwrap_or_else(|| panic!("Access a column family that must exist: {}", data_cf_name)) +} + +impl Storage for PartitionStore { + type TransactionType<'a> = RocksDBTransaction<'a>; + + fn transaction(&mut self) -> Self::TransactionType<'_> { + PartitionStore::transaction(self) + } +} + +impl StorageAccess for PartitionStore { + type DBAccess<'a> + = DB where + Self: 'a,; + + fn iterator_from( + &self, + scan: TableScan, + ) -> DBRawIteratorWithThreadMode<'_, Self::DBAccess<'_>> { + self.iterator_from(scan) + } + + #[inline] + fn cleared_key_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut { + self.key_buffer.clear(); + self.key_buffer.reserve(min_size); + &mut self.key_buffer + } + + #[inline] + fn cleared_value_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut { + self.value_buffer.clear(); + self.value_buffer.reserve(min_size); + &mut self.value_buffer + } + + #[inline] + fn get>(&self, table: TableKind, key: K) -> Result> { + let table = self.table_handle(table); + self.raw_db + .get_pinned_cf(&table, key) + .map_err(|error| StorageError::Generic(error.into())) + } + + #[inline] + fn put_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) { + let table = self.table_handle(table); + self.raw_db.put_cf(&table, key, value).unwrap(); + } + + #[inline] + fn delete_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>) { + let table = self.table_handle(table); + self.raw_db.delete_cf(&table, key).unwrap(); + } +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum ScanMode { + /// Scan is bound to a single fixed key prefix (partition id, or a single partition key). + WithinPrefix, + /// Scan/iterator requires total order seek, this means that the iterator is not bound to a + /// fixed prefix that matches the column family prefix extractor length. For instance, if + /// scanning data across multiple partition IDs or multiple partition keys. + TotalOrder, +} + +pub struct RocksDBTransaction<'a> { + txn: rocksdb::Transaction<'a, DB>, + db: Arc, + _rocksdb: Arc, + data_cf_handle: Arc>, + key_buffer: &'a mut BytesMut, + value_buffer: &'a mut BytesMut, +} + +impl<'a> RocksDBTransaction<'a> { + pub(crate) fn prefix_iterator( + &self, + table: TableKind, + _key_kind: KeyKind, + prefix: Bytes, + ) -> DBIteratorTransaction { + let table = self.table_handle(table); + let mut opts = ReadOptions::default(); + opts.set_iterate_range(PrefixRange(prefix.clone())); + opts.set_prefix_same_as_start(true); + opts.set_async_io(true); + opts.set_total_order_seek(false); + + let mut it = self.txn.raw_iterator_cf_opt(table, opts); + it.seek(prefix); + it + } + + pub(crate) fn range_iterator( + &self, + table: TableKind, + _key_kind: KeyKind, + scan_mode: ScanMode, + from: Bytes, + to: Bytes, + ) -> DBIteratorTransaction { + let table = self.table_handle(table); + let mut opts = ReadOptions::default(); + // todo: use auto_prefix_mode, at the moment, rocksdb doesn't expose this through the C + // binding. + opts.set_total_order_seek(scan_mode == ScanMode::TotalOrder); + opts.set_iterate_range(from.clone()..to); + opts.set_async_io(true); + let mut it = self.txn.raw_iterator_cf_opt(table, opts); + it.seek(from); + it + } + + pub(crate) fn table_handle(&self, _table_kind: TableKind) -> &Arc { + // Right now, everything is in one cf, return a reference and save CPU. + &self.data_cf_handle + } +} + +impl<'a> Transaction for RocksDBTransaction<'a> { + async fn commit(self) -> Result<()> { + // We cannot directly commit the txn because it might fail because of unrelated concurrent + // writes to RocksDB. However, it is safe to write the WriteBatch for a given partition, + // because there can only be a single writer (the leading PartitionProcessor). + let write_batch = self.txn.get_writebatch(); + // todo: make async and use configuration to control use of WAL + if write_batch.is_empty() { + return Ok(()); + } + let mut opts = rocksdb::WriteOptions::default(); + // We disable WAL since bifrost is our durable distributed log. + opts.disable_wal(true); + self.db + .write_opt(&write_batch, &rocksdb::WriteOptions::default()) + .map_err(|error| StorageError::Generic(error.into())) + } +} + +impl<'a> StorageAccess for RocksDBTransaction<'a> { + type DBAccess<'b> = TransactionDB<'b> where Self: 'b; + + fn iterator_from( + &self, + scan: TableScan, + ) -> DBRawIteratorWithThreadMode<'_, Self::DBAccess<'_>> { + let scan: PhysicalScan = scan.into(); + match scan { + PhysicalScan::Prefix(table, key_kind, prefix) => { + self.prefix_iterator(table, key_kind, prefix.freeze()) + } + PhysicalScan::RangeExclusive(table, key_kind, scan_mode, start, end) => { + self.range_iterator(table, key_kind, scan_mode, start.freeze(), end.freeze()) + } + PhysicalScan::RangeOpen(table, key_kind, start) => { + // We delayed the generate the synthetic iterator upper bound until this point + // because we might have different prefix length requirements based on the + // table+key_kind combination and we should keep this knowledge as low-level as + // possible. + // + // make the end has the same length as all prefixes to ensure rocksdb key + // comparator can leverage bloom filters when applicable + // (if auto_prefix_mode is enabled) + let mut end = BytesMut::zeroed(DB_PREFIX_LENGTH); + // We want to ensure that Range scans fall within the same key kind. + // So, we limit the iterator to the upper bound of this prefix + let kind_upper_bound = K::KEY_KIND.exclusive_upper_bound(); + end[..kind_upper_bound.len()].copy_from_slice(&kind_upper_bound); + self.range_iterator( + table, + key_kind, + ScanMode::WithinPrefix, + start.freeze(), + end.freeze(), + ) + } + } + } + + #[inline] + fn cleared_key_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut { + self.key_buffer.clear(); + self.key_buffer.reserve(min_size); + self.key_buffer + } + + #[inline] + fn cleared_value_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut { + self.value_buffer.clear(); + self.value_buffer.reserve(min_size); + self.value_buffer + } + + #[inline] + fn get>(&self, table: TableKind, key: K) -> Result> { + let table = self.table_handle(table); + self.txn + .get_pinned_cf(table, key) + .map_err(|error| StorageError::Generic(error.into())) + } + + #[inline] + fn put_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) { + let table = self.table_handle(table); + self.txn.put_cf(table, key, value).unwrap(); + } + + #[inline] + fn delete_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>) { + let table = self.table_handle(table); + self.txn.delete_cf(table, key).unwrap(); + } +} + +pub(crate) trait StorageAccess { + type DBAccess<'a>: rocksdb::DBAccess + where + Self: 'a; + + fn iterator_from( + &self, + scan: TableScan, + ) -> DBRawIteratorWithThreadMode<'_, Self::DBAccess<'_>>; + + fn cleared_key_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut; + + fn cleared_value_buffer_mut(&mut self, min_size: usize) -> &mut BytesMut; + + fn get>(&self, table: TableKind, key: K) -> Result>; + + fn put_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>); + + fn delete_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>); + + #[inline] + fn put_kv_raw>(&mut self, key: K, value: V) { + let key_buffer = self.cleared_key_buffer_mut(key.serialized_length()); + key.serialize_to(key_buffer); + let key_buffer = key_buffer.split(); + + self.put_cf(K::TABLE, key_buffer, value); + } + + #[inline] + fn put_kv(&mut self, key: K, value: V) { + let key_buffer = self.cleared_key_buffer_mut(key.serialized_length()); + key.serialize_to(key_buffer); + let key_buffer = key_buffer.split(); + + let value_buffer = self.cleared_value_buffer_mut(0); + StorageCodec::encode(&value, value_buffer).unwrap(); + let value_buffer = value_buffer.split(); + + self.put_cf(K::TABLE, key_buffer, value_buffer); + } + + #[inline] + fn delete_key(&mut self, key: &K) { + let buffer = self.cleared_key_buffer_mut(key.serialized_length()); + key.serialize_to(buffer); + let buffer = buffer.split(); + + self.delete_cf(K::TABLE, buffer); + } + + #[inline] + fn get_value(&mut self, key: K) -> Result> + where + K: TableKey, + V: StorageDecode, + { + let mut buf = self.cleared_key_buffer_mut(key.serialized_length()); + key.serialize_to(&mut buf); + let buf = buf.split(); + + match self.get(K::TABLE, &buf) { + Ok(value) => { + let slice = value.as_ref().map(|v| v.as_ref()); + + if let Some(mut slice) = slice { + Ok(Some( + StorageCodec::decode::(&mut slice) + .map_err(|err| StorageError::Generic(err.into()))?, + )) + } else { + Ok(None) + } + } + Err(err) => Err(err), + } + } + + #[inline] + fn get_first_blocking(&mut self, scan: TableScan, f: F) -> Result + where + K: TableKey, + F: FnOnce(Option<(&[u8], &[u8])>) -> Result, + { + let iterator = self.iterator_from(scan); + f(iterator.item()) + } + + #[inline] + fn get_kv_raw(&mut self, key: K, f: F) -> Result + where + K: TableKey, + F: FnOnce(&[u8], Option<&[u8]>) -> Result, + { + let mut buf = self.cleared_key_buffer_mut(key.serialized_length()); + key.serialize_to(&mut buf); + let buf = buf.split(); + + match self.get(K::TABLE, &buf) { + Ok(value) => { + let slice = value.as_ref().map(|v| v.as_ref()); + f(&buf, slice) + } + Err(err) => Err(err), + } + } + + #[inline] + fn for_each_key_value_in_place(&self, scan: TableScan, mut op: F) -> Vec> + where + K: TableKey, + F: FnMut(&[u8], &[u8]) -> TableScanIterationDecision, + { + let mut res = Vec::new(); + + let mut iterator = self.iterator_from(scan); + + while let Some((k, v)) = iterator.item() { + match op(k, v) { + TableScanIterationDecision::Emit(result) => { + res.push(result); + iterator.next(); + } + TableScanIterationDecision::BreakWith(result) => { + res.push(result); + break; + } + TableScanIterationDecision::Continue => { + iterator.next(); + continue; + } + TableScanIterationDecision::Break => { + break; + } + }; + } + + res + } +} diff --git a/crates/storage-rocksdb/src/partition_store_manager.rs b/crates/storage-rocksdb/src/partition_store_manager.rs new file mode 100644 index 000000000..a2de30eb1 --- /dev/null +++ b/crates/storage-rocksdb/src/partition_store_manager.rs @@ -0,0 +1,174 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; +use std::ops::RangeInclusive; +use std::sync::Arc; + +use tokio::sync::Mutex; +use tracing::info; + +use restate_core::ShutdownError; +use restate_rocksdb::{ + CfName, CfPrefixPattern, DbName, DbSpecBuilder, Owner, RocksDb, RocksDbManager, RocksError, +}; +use restate_types::arc_util::Updateable; +use restate_types::config::RocksDbOptions; +use restate_types::config::StorageOptions; +use restate_types::identifiers::PartitionId; +use restate_types::identifiers::PartitionKey; + +use crate::cf_options; +use crate::PartitionStore; +use crate::DB; + +const DB_NAME: &str = "db"; +const PARTITION_CF_PREFIX: &str = "data-"; + +/// Controls how a partition store is opened +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum OpenMode { + CreateIfMissing, + OpenExisting, +} + +#[derive(Clone, Debug)] +pub struct PartitionStoreManager { + lookup: Arc>, + rocksdb: Arc, + raw_db: Arc, +} + +#[derive(Default, Debug)] +struct PartitionLookup { + live: BTreeMap, +} + +impl PartitionStoreManager { + pub async fn create( + mut storage_opts: impl Updateable + Send + 'static, + updateable_opts: impl Updateable + Send + 'static, + initial_partition_set: &[(PartitionId, RangeInclusive)], + ) -> std::result::Result { + let options = storage_opts.load(); + + // todo: temporary until we completely remove unpartitioned cf. + let mut ensure_cfs = partition_ids_to_cfs(initial_partition_set); + ensure_cfs.push(CfName::new("data-unpartitioned")); + + let db_spec = DbSpecBuilder::new( + DbName::new(DB_NAME), + Owner::PartitionProcessor, + options.data_dir(), + db_options(), + ) + .add_cf_pattern(CfPrefixPattern::new(PARTITION_CF_PREFIX), cf_options) + .ensure_column_families(ensure_cfs) + .build_as_optimistic_db(); + + let manager = RocksDbManager::get(); + // todo remove this when open_db is async + let raw_db = tokio::task::spawn_blocking(move || manager.open_db(updateable_opts, db_spec)) + .await + .map_err(|_| ShutdownError)??; + + let rocksdb = manager + .get_db(Owner::PartitionProcessor, DbName::new(DB_NAME)) + .unwrap(); + + Ok(Self { + raw_db, + rocksdb, + lookup: Arc::default(), + }) + } + + pub async fn has_partition(&self, partition_id: PartitionId) -> bool { + let guard = self.lookup.lock().await; + guard.live.get(&partition_id).is_some() + } + + pub async fn get_partition_store(&self, partition_id: PartitionId) -> Option { + self.lookup.lock().await.live.get(&partition_id).cloned() + } + + #[allow(non_snake_case)] + pub fn get_legacy_storage_REMOVE_ME(&self) -> PartitionStore { + PartitionStore::new( + self.raw_db.clone(), + self.rocksdb.clone(), + CfName::new("data-unpartitioned"), + 0, + RangeInclusive::new(0, PartitionKey::MAX - 1), + ) + } + + pub async fn open_partition_store( + &self, + partition_id: PartitionId, + partition_key_range: RangeInclusive, + open_mode: OpenMode, + opts: &RocksDbOptions, + ) -> std::result::Result { + let mut guard = self.lookup.lock().await; + if let Some(store) = guard.live.get(&partition_id) { + return Ok(store.clone()); + } + let cf_name = cf_for_partition(partition_id); + let already_exists = self.rocksdb.cf_handle(&cf_name).is_some(); + + if !already_exists { + if open_mode == OpenMode::CreateIfMissing { + info!("Initializing storage for partition {}", partition_id); + self.rocksdb.open_cf(cf_name.clone(), opts).await?; + } else { + return Err(RocksError::AlreadyOpen); + } + } + + let partition_store = PartitionStore::new( + self.raw_db.clone(), + self.rocksdb.clone(), + cf_name, + partition_id, + partition_key_range, + ); + guard.live.insert(partition_id, partition_store.clone()); + + Ok(partition_store) + } +} + +fn cf_for_partition(partition_id: PartitionId) -> CfName { + CfName::from(format!("{}{}", PARTITION_CF_PREFIX, partition_id)) +} + +#[inline] +fn partition_ids_to_cfs(partition_ids: &[(PartitionId, T)]) -> Vec { + partition_ids + .iter() + .map(|(partition, _)| cf_for_partition(*partition)) + .collect() +} + +fn db_options() -> rocksdb::Options { + let mut db_options = rocksdb::Options::default(); + // no need to retain 1000 log files by default. + // + db_options.set_keep_log_file_num(1); + + // we always need to enable atomic flush in case that the user disables wal at runtime + db_options.set_atomic_flush(true); + + // we always enable manual wal flushing in case that the user enables wal at runtime + db_options.set_manual_wal_flush(true); + + db_options +} diff --git a/crates/storage-rocksdb/src/service_status_table/mod.rs b/crates/storage-rocksdb/src/service_status_table/mod.rs index 55b6c1bcc..97562d460 100644 --- a/crates/storage-rocksdb/src/service_status_table/mod.rs +++ b/crates/storage-rocksdb/src/service_status_table/mod.rs @@ -11,7 +11,7 @@ use crate::keys::{define_table_key, KeyKind, TableKey}; use crate::owned_iter::OwnedIterator; use crate::TableScan::FullScanPartitionKeyRange; -use crate::{RocksDBStorage, TableKind}; +use crate::{PartitionStore, TableKind}; use crate::{RocksDBTransaction, StorageAccess}; use bytestring::ByteString; use restate_storage_api::service_status_table::{ @@ -75,7 +75,7 @@ fn delete_virtual_object_status(storage: &mut S, service_id: & storage.delete_key(&key); } -impl ReadOnlyVirtualObjectStatusTable for RocksDBStorage { +impl ReadOnlyVirtualObjectStatusTable for PartitionStore { async fn get_virtual_object_status( &mut self, service_id: &ServiceId, @@ -115,7 +115,7 @@ pub struct OwnedVirtualObjectStatusRow { pub status: VirtualObjectStatus, } -impl RocksDBStorage { +impl PartitionStore { pub fn all_virtual_object_status( &self, range: RangeInclusive, diff --git a/crates/storage-rocksdb/src/state_table/mod.rs b/crates/storage-rocksdb/src/state_table/mod.rs index 9bbbfad62..d031ea9e7 100644 --- a/crates/storage-rocksdb/src/state_table/mod.rs +++ b/crates/storage-rocksdb/src/state_table/mod.rs @@ -11,7 +11,7 @@ use crate::keys::{define_table_key, KeyKind, TableKey}; use crate::owned_iter::OwnedIterator; use crate::TableKind::State; -use crate::{RocksDBStorage, RocksDBTransaction, StorageAccess}; +use crate::{PartitionStore, RocksDBTransaction, StorageAccess}; use crate::{TableScan, TableScanIterationDecision}; use bytes::Bytes; use bytestring::ByteString; @@ -115,7 +115,7 @@ fn get_all_user_states( ) } -impl ReadOnlyStateTable for RocksDBStorage { +impl ReadOnlyStateTable for PartitionStore { fn get_user_state( &mut self, service_id: &ServiceId, @@ -192,7 +192,7 @@ pub struct OwnedStateRow { pub state_value: Bytes, } -impl RocksDBStorage { +impl PartitionStore { pub fn all_states( &self, range: RangeInclusive, diff --git a/crates/storage-rocksdb/src/timer_table/mod.rs b/crates/storage-rocksdb/src/timer_table/mod.rs index 5a0b3177c..1491cffaf 100644 --- a/crates/storage-rocksdb/src/timer_table/mod.rs +++ b/crates/storage-rocksdb/src/timer_table/mod.rs @@ -11,7 +11,7 @@ use crate::keys::{define_table_key, KeyKind, TableKey}; use crate::TableKind::Timers; use crate::TableScanIterationDecision::Emit; -use crate::{RocksDBStorage, RocksDBTransaction, StorageAccess}; +use crate::{PartitionStore, RocksDBTransaction, StorageAccess}; use crate::{TableScan, TableScanIterationDecision}; use futures::Stream; use futures_util::stream; @@ -121,7 +121,7 @@ fn next_timers_greater_than( }) } -impl TimerTable for RocksDBStorage { +impl TimerTable for PartitionStore { async fn add_timer(&mut self, partition_id: PartitionId, key: &TimerKey, timer: Timer) { add_timer(self, partition_id, key, timer) } diff --git a/crates/storage-rocksdb/src/writer.rs b/crates/storage-rocksdb/src/writer.rs index 239136a36..635baba4b 100644 --- a/crates/storage-rocksdb/src/writer.rs +++ b/crates/storage-rocksdb/src/writer.rs @@ -11,7 +11,6 @@ use crate::{WriteBatch, DB}; use futures::ready; use futures_util::FutureExt; -use log::debug; use restate_storage_api::StorageError; use restate_types::arc_util::Updateable; use restate_types::config::StorageOptions; @@ -25,6 +24,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::Sender; +use tracing::debug; pub struct WriteCommand { write_batch: WriteBatch, diff --git a/crates/storage-rocksdb/tests/inbox_table_test/mod.rs b/crates/storage-rocksdb/tests/inbox_table_test/mod.rs index 354e3ee81..cd748c5dd 100644 --- a/crates/storage-rocksdb/tests/inbox_table_test/mod.rs +++ b/crates/storage-rocksdb/tests/inbox_table_test/mod.rs @@ -12,7 +12,7 @@ use crate::{assert_stream_eq, mock_state_mutation}; use once_cell::sync::Lazy; use restate_storage_api::inbox_table::{InboxEntry, InboxTable, SequenceNumberInboxEntry}; use restate_storage_api::Transaction; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::{InvocationId, ServiceId}; static INBOX_ENTRIES: Lazy> = Lazy::new(|| { @@ -83,7 +83,7 @@ async fn peek_after_delete(table: &mut T) { assert_eq!(result.unwrap(), Some(INBOX_ENTRIES[1].clone())); } -pub(crate) async fn run_tests(mut rocksdb: RocksDBStorage) { +pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { let mut txn = rocksdb.transaction(); populate_data(&mut txn).await; diff --git a/crates/storage-rocksdb/tests/integration_test.rs b/crates/storage-rocksdb/tests/integration_test.rs index 5d54d3041..db9857496 100644 --- a/crates/storage-rocksdb/tests/integration_test.rs +++ b/crates/storage-rocksdb/tests/integration_test.rs @@ -13,14 +13,15 @@ use futures::Stream; use restate_core::TaskCenterBuilder; use restate_rocksdb::RocksDbManager; use restate_storage_api::StorageError; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_types::arc_util::Constant; use restate_types::config::{CommonOptions, WorkerOptions}; -use restate_types::identifiers::{InvocationId, ServiceId}; +use restate_types::identifiers::{InvocationId, PartitionKey, ServiceId}; use restate_types::invocation::{InvocationTarget, ServiceInvocation, Source, SpanRelation}; use restate_types::state_mut::ExternalStateMutation; use std::collections::HashMap; use std::fmt::Debug; +use std::ops::RangeInclusive; use std::pin::pin; use tokio_stream::StreamExt; @@ -33,7 +34,7 @@ mod state_table_test; mod timer_table_test; mod virtual_object_status_table_test; -async fn storage_test_environment() -> RocksDBStorage { +async fn storage_test_environment() -> PartitionStore { // // create a rocksdb storage from options // @@ -45,12 +46,23 @@ async fn storage_test_environment() -> RocksDBStorage { RocksDbManager::init(Constant::new(CommonOptions::default())) }); let worker_options = WorkerOptions::default(); - RocksDBStorage::open( + let manager = PartitionStoreManager::create( Constant::new(worker_options.storage.clone()), - Constant::new(worker_options.storage.rocksdb), + Constant::new(worker_options.storage.rocksdb.clone()), + &[], ) .await - .expect("RocksDB storage creation should succeed") + .expect("DB storage creation succeeds"); + // A single partition store that spans all keys. + manager + .open_partition_store( + 0, + RangeInclusive::new(0, PartitionKey::MAX - 1), + OpenMode::CreateIfMissing, + &worker_options.storage.rocksdb, + ) + .await + .expect("DB storage creation succeeds") } #[tokio::test] diff --git a/crates/storage-rocksdb/tests/invocation_status_table_test/mod.rs b/crates/storage-rocksdb/tests/invocation_status_table_test/mod.rs index dc9c8fdfd..216da0157 100644 --- a/crates/storage-rocksdb/tests/invocation_status_table_test/mod.rs +++ b/crates/storage-rocksdb/tests/invocation_status_table_test/mod.rs @@ -20,7 +20,7 @@ use restate_storage_api::invocation_status_table::{ InFlightInvocationMetadata, InvocationStatus, InvocationStatusTable, JournalMetadata, StatusTimestamps, }; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::InvocationId; use restate_types::invocation::{ HandlerType, InvocationTarget, ServiceInvocationSpanContext, Source, @@ -125,7 +125,7 @@ async fn verify_all_svc_with_status_invoked(txn: &mut assert_stream_eq(stream, expected).await; } -pub(crate) async fn run_tests(mut rocksdb: RocksDBStorage) { +pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { let mut txn = rocksdb.transaction(); populate_data(&mut txn).await; diff --git a/crates/storage-rocksdb/tests/journal_table_test/mod.rs b/crates/storage-rocksdb/tests/journal_table_test/mod.rs index ffe974980..7ffbcee57 100644 --- a/crates/storage-rocksdb/tests/journal_table_test/mod.rs +++ b/crates/storage-rocksdb/tests/journal_table_test/mod.rs @@ -14,7 +14,7 @@ use futures_util::StreamExt; use once_cell::sync::Lazy; use restate_storage_api::journal_table::{JournalEntry, JournalTable}; use restate_storage_api::Transaction; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::{InvocationId, InvocationUuid}; use restate_types::invocation::{InvocationTarget, ServiceInvocationSpanContext}; use restate_types::journal::enriched::{ @@ -124,7 +124,7 @@ async fn verify_journal_deleted(txn: &mut T) { } } -pub(crate) async fn run_tests(mut rocksdb: RocksDBStorage) { +pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { let mut txn = rocksdb.transaction(); populate_data(&mut txn).await; diff --git a/crates/storage-rocksdb/tests/outbox_table_test/mod.rs b/crates/storage-rocksdb/tests/outbox_table_test/mod.rs index 6a238f023..74b661fc2 100644 --- a/crates/storage-rocksdb/tests/outbox_table_test/mod.rs +++ b/crates/storage-rocksdb/tests/outbox_table_test/mod.rs @@ -11,7 +11,7 @@ use crate::mock_random_service_invocation; use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable}; use restate_storage_api::Transaction; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; fn mock_outbox_message() -> OutboxMessage { OutboxMessage::ServiceInvocation(mock_random_service_invocation()) @@ -47,7 +47,7 @@ pub(crate) async fn verify_outbox_is_empty_after_truncation(txn: assert_eq!(result, None); } -pub(crate) async fn run_tests(mut rocksdb: RocksDBStorage) { +pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { let mut txn = rocksdb.transaction(); populate_data(&mut txn).await; diff --git a/crates/storage-rocksdb/tests/state_table_test/mod.rs b/crates/storage-rocksdb/tests/state_table_test/mod.rs index 2140a521b..cef770678 100644 --- a/crates/storage-rocksdb/tests/state_table_test/mod.rs +++ b/crates/storage-rocksdb/tests/state_table_test/mod.rs @@ -12,7 +12,7 @@ use crate::{assert_stream_eq, storage_test_environment}; use bytes::Bytes; use restate_storage_api::state_table::{ReadOnlyStateTable, StateTable}; use restate_storage_api::Transaction; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::ServiceId; async fn populate_data(table: &mut T) { @@ -95,7 +95,7 @@ async fn verify_prefix_scan_after_delete(table: &mut T) { assert_stream_eq(result, expected).await; } -pub(crate) async fn run_tests(mut rocksdb: RocksDBStorage) { +pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { let mut txn = rocksdb.transaction(); populate_data(&mut txn).await; diff --git a/crates/storage-rocksdb/tests/timer_table_test/mod.rs b/crates/storage-rocksdb/tests/timer_table_test/mod.rs index 599f0c0d9..fb5de2cff 100644 --- a/crates/storage-rocksdb/tests/timer_table_test/mod.rs +++ b/crates/storage-rocksdb/tests/timer_table_test/mod.rs @@ -12,7 +12,7 @@ use crate::mock_service_invocation; use futures_util::StreamExt; use restate_storage_api::timer_table::{Timer, TimerKey, TimerTable}; use restate_storage_api::Transaction; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::{InvocationUuid, ServiceId}; use restate_types::invocation::ServiceInvocation; use std::pin::pin; @@ -145,7 +145,7 @@ async fn verify_next_timer_after_deletion(txn: &mut T) { } } -pub(crate) async fn run_tests(mut rocksdb: RocksDBStorage) { +pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { let mut txn = rocksdb.transaction(); populate_data(&mut txn).await; diff --git a/crates/storage-rocksdb/tests/virtual_object_status_table_test/mod.rs b/crates/storage-rocksdb/tests/virtual_object_status_table_test/mod.rs index b9d1e8f91..1e8299638 100644 --- a/crates/storage-rocksdb/tests/virtual_object_status_table_test/mod.rs +++ b/crates/storage-rocksdb/tests/virtual_object_status_table_test/mod.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use restate_storage_api::service_status_table::{VirtualObjectStatus, VirtualObjectStatusTable}; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::{InvocationId, InvocationUuid, ServiceId}; const FIXTURE_INVOCATION: InvocationUuid = @@ -41,7 +41,7 @@ async fn verify_point_lookups(txn: &mut T) { ); } -pub(crate) async fn run_tests(mut rocksdb: RocksDBStorage) { +pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { let mut txn = rocksdb.transaction(); populate_data(&mut txn).await; diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 8847bc5f9..865ff12fd 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -197,11 +197,6 @@ pub struct StorageOptions { #[serde(flatten)] pub rocksdb: RocksDbOptions, - /// # Sync WAL on flushes - /// - /// If WAL is enabled, this option defines whether the WAL will also be synced on flushes. - pub sync_wal_on_flush: bool, - #[cfg(any(test, feature = "test-util"))] #[serde(skip, default = "super::default_arc_tmp")] data_dir: std::sync::Arc, @@ -228,7 +223,6 @@ impl Default for StorageOptions { StorageOptions { rocksdb, - sync_wal_on_flush: false, #[cfg(any(test, feature = "test-util"))] data_dir: super::default_arc_tmp(), } diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 0412deeda..419a797cc 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -32,6 +32,7 @@ restate-invoker-impl = { workspace = true } restate-metadata-store = { workspace = true } restate-network = { workspace = true } restate-node-protocol = { workspace = true } +restate-rocksdb = { workspace = true } restate-schema = { workspace = true } restate-schema-api = { workspace = true, features = [ "service", "subscription"] } restate-serde-util = { workspace = true, features = ["proto"] } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 88ecdb108..b71f38744 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -42,7 +42,7 @@ use restate_schema::UpdateableSchema; use restate_service_protocol::codec::ProtobufRawEntryCodec; use restate_storage_query_datafusion::context::QueryContext; use restate_storage_query_postgres::service::PostgresQueryService; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::{PartitionStore, PartitionStoreManager}; use crate::invoker_integration::EntryEnricher; use crate::partition::storage::invoker::InvokerStorageReader; @@ -53,7 +53,7 @@ use restate_types::Version; type PartitionProcessor = partition::PartitionProcessor< ProtobufRawEntryCodec, - InvokerChannelServiceHandle>, + InvokerChannelServiceHandle>, >; type ExternalClientIngress = HyperServerIngress; @@ -72,6 +72,12 @@ pub enum BuildError { #[code] restate_storage_rocksdb::BuildError, ), + #[error("failed opening partition store: {0}")] + RocksDb( + #[from] + #[code] + restate_rocksdb::RocksError, + ), #[code(unknown)] Invoker(#[from] restate_invoker_impl::BuildError), } @@ -94,14 +100,14 @@ pub struct Worker { storage_query_postgres: PostgresQueryService, #[allow(clippy::type_complexity)] invoker: InvokerService< - InvokerStorageReader, + InvokerStorageReader, EntryEnricher, UpdateableSchema, >, external_client_ingress: ExternalClientIngress, ingress_kafka: IngressKafkaService, subscription_controller_handle: SubscriptionControllerHandle, - rocksdb_storage: RocksDBStorage, + partition_store_manager: PartitionStoreManager, } impl Worker { @@ -155,15 +161,18 @@ impl Worker { // a really ugly hack (I'm ashamed) until we can decouple opening database(s) // from worker creation, or we make worker creation async. This is a stop gap // to avoid unraveling the entire worker creation process to be async in this change. - let rocksdb_storage = futures::executor::block_on(RocksDBStorage::open( + let partition_store_manager = futures::executor::block_on(PartitionStoreManager::create( updateable_config .clone() .map_as_updateable_owned(|c| &c.worker.storage), updateable_config .clone() .map_as_updateable_owned(|c| &c.worker.storage.rocksdb), + &[], ))?; + let legacy_storage = partition_store_manager.get_legacy_storage_REMOVE_ME(); + let invoker = InvokerService::from_options( &config.common.service_client, &config.worker.invoker, @@ -173,7 +182,7 @@ impl Worker { let storage_query_context = QueryContext::from_options( &config.admin.query_engine, - rocksdb_storage.clone(), + legacy_storage.clone(), invoker.status_reader(), schema_view.clone(), )?; @@ -191,7 +200,7 @@ impl Worker { external_client_ingress: ingress_http, ingress_kafka, subscription_controller_handle, - rocksdb_storage, + partition_store_manager, metadata_store_client, }) } @@ -204,10 +213,6 @@ impl Worker { &self.storage_query_context } - pub fn rocksdb_storage(&self) -> &RocksDBStorage { - &self.rocksdb_storage - } - pub async fn run(self, bifrost: Bifrost) -> anyhow::Result<()> { let tc = task_center(); @@ -257,7 +262,7 @@ impl Worker { self.updateable_config.clone(), metadata().my_node_id(), self.metadata_store_client, - self.rocksdb_storage, + self.partition_store_manager, self.networking, bifrost, invoker_handle, diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 0ca108dd0..5d94a6ebb 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -34,14 +34,14 @@ pub(crate) use action_collector::{ActionEffect, ActionEffectStream}; use restate_bifrost::Bifrost; use restate_errors::NotRunningError; use restate_storage_api::deduplication_table::EpochSequenceNumber; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::{InvocationId, PartitionKey}; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionLeaderEpoch}; use restate_wal_protocol::timer::TimerValue; use super::storage::invoker::InvokerStorageReader; -type PartitionStorage = storage::PartitionStorage; +type PartitionStorage = storage::PartitionStorage; type TimerService = restate_timer::TimerService; pub(crate) struct LeaderState { @@ -84,7 +84,7 @@ pub(crate) enum LeadershipState { impl LeadershipState where - InvokerInputSender: restate_invoker_api::ServiceHandle>, + InvokerInputSender: restate_invoker_api::ServiceHandle>, { #[allow(clippy::too_many_arguments)] pub(crate) fn follower( diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 5deefeeb9..dd530e75b 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -17,7 +17,7 @@ use futures::StreamExt; use metrics::counter; use restate_core::metadata; use restate_network::Networking; -use restate_storage_rocksdb::{RocksDBStorage, RocksDBTransaction}; +use restate_storage_rocksdb::{PartitionStore, RocksDBTransaction}; use restate_types::identifiers::{PartitionId, PartitionKey}; use std::fmt::Debug; use std::marker::PhantomData; @@ -53,8 +53,6 @@ pub(super) struct PartitionProcessor { invoker_tx: InvokerInputSender, - rocksdb_storage: RocksDBStorage, - _entry_codec: PhantomData, } @@ -62,7 +60,7 @@ impl PartitionProcessor> + Clone, + restate_invoker_api::ServiceHandle> + Clone, { #[allow(clippy::too_many_arguments)] pub(super) fn new( @@ -71,7 +69,6 @@ where num_timers_in_memory_limit: Option, channel_size: usize, invoker_tx: InvokerInputSender, - rocksdb_storage: RocksDBStorage, ) -> Self { Self { partition_id, @@ -80,24 +77,27 @@ where channel_size, invoker_tx, _entry_codec: Default::default(), - rocksdb_storage, } } #[instrument(level = "info", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))] - pub(super) async fn run(self, networking: Networking, bifrost: Bifrost) -> anyhow::Result<()> { + pub(super) async fn run( + self, + networking: Networking, + bifrost: Bifrost, + partition_store: PartitionStore, + ) -> anyhow::Result<()> { let PartitionProcessor { partition_id, partition_key_range, num_timers_in_memory_limit, channel_size, invoker_tx, - rocksdb_storage, .. } = self; let mut partition_storage = - PartitionStorage::new(partition_id, partition_key_range.clone(), rocksdb_storage); + PartitionStorage::new(partition_id, partition_key_range.clone(), partition_store); let mut state_machine = Self::create_state_machine::( &mut partition_storage, @@ -207,7 +207,7 @@ where } async fn create_state_machine( - partition_storage: &mut PartitionStorage, + partition_storage: &mut PartitionStorage, partition_key_range: RangeInclusive, ) -> Result, restate_storage_api::StorageError> where diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index a226b64e4..d5dd66394 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -107,7 +107,7 @@ mod tests { }; use restate_storage_api::state_table::{ReadOnlyStateTable, StateTable}; use restate_storage_api::Transaction; - use restate_storage_rocksdb::RocksDBStorage; + use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_test_util::matchers::*; use restate_types::arc_util::Constant; use restate_types::config::{CommonOptions, WorkerOptions}; @@ -132,7 +132,7 @@ mod tests { state_machine: StateMachine, // TODO for the time being we use rocksdb storage because we have no mocks for storage interfaces. // Perhaps we could make these tests faster by having those. - rocksdb_storage: RocksDBStorage, + rocksdb_storage: PartitionStore, effects_buffer: Effects, } @@ -150,12 +150,22 @@ mod tests { "Using RocksDB temp directory {}", worker_options.storage.data_dir().display() ); - let rocksdb_storage = restate_storage_rocksdb::RocksDBStorage::open( + let manager = PartitionStoreManager::create( Constant::new(worker_options.storage.clone()), - Constant::new(worker_options.storage.rocksdb), + Constant::new(worker_options.storage.rocksdb.clone()), + &[], ) .await .unwrap(); + let rocksdb_storage = manager + .open_partition_store( + 0, + RangeInclusive::new(PartitionKey::MIN, PartitionKey::MAX), + OpenMode::CreateIfMissing, + &worker_options.storage.rocksdb, + ) + .await + .unwrap(); Self { state_machine: StateMachine::new( @@ -203,7 +213,7 @@ mod tests { actions } - pub fn storage(&mut self) -> &mut RocksDBStorage { + pub fn storage(&mut self) -> &mut PartitionStore { &mut self.rocksdb_storage } } diff --git a/crates/worker/src/partition/storage/mod.rs b/crates/worker/src/partition/storage/mod.rs index f6d49e84c..3d5926099 100644 --- a/crates/worker/src/partition/storage/mod.rs +++ b/crates/worker/src/partition/storage/mod.rs @@ -46,6 +46,7 @@ use std::ops::RangeInclusive; pub mod invoker; +// todo(asoli): merge into PartitionStore #[derive(Debug, Clone)] pub(crate) struct PartitionStorage { partition_id: PartitionId, diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 121732c50..3f799983e 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -16,7 +16,7 @@ use restate_core::{metadata, task_center, ShutdownError, TaskId, TaskKind}; use restate_invoker_impl::InvokerHandle; use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError}; use restate_network::Networking; -use restate_storage_rocksdb::RocksDBStorage; +use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_types::arc_util::ArcSwapExt; use restate_types::config::{UpdateableConfiguration, WorkerOptions}; use restate_types::epoch::EpochMetadata; @@ -36,10 +36,10 @@ pub struct PartitionProcessorManager { running_partition_processors: HashMap, metadata_store_client: MetadataStoreClient, - rocksdb_storage: RocksDBStorage, + partition_store_manager: PartitionStoreManager, networking: Networking, bifrost: Bifrost, - invoker_handle: InvokerHandle>, + invoker_handle: InvokerHandle>, } impl PartitionProcessorManager { @@ -47,17 +47,17 @@ impl PartitionProcessorManager { updateable_config: UpdateableConfiguration, node_id: GenerationalNodeId, metadata_store_client: MetadataStoreClient, - rocksdb_storage: RocksDBStorage, + partition_store_manager: PartitionStoreManager, networking: Networking, bifrost: Bifrost, - invoker_handle: InvokerHandle>, + invoker_handle: InvokerHandle>, ) -> Self { Self { updateable_config, running_partition_processors: HashMap::default(), node_id, metadata_store_client, - rocksdb_storage, + partition_store_manager, networking, bifrost, invoker_handle, @@ -121,19 +121,32 @@ impl PartitionProcessorManager { TaskKind::PartitionProcessor, "partition-processor", Some(processor.partition_id), - async move { - if role == Role::Leader { - Self::claim_leadership( - &mut bifrost, - metadata_store_client, - partition_id, - partition_range, - node_id, - ) - .await?; - } + { + let storage_manager = self.partition_store_manager.clone(); + let options = options.clone(); + async move { + let partition_store = storage_manager + .open_partition_store( + partition_id, + partition_range.clone(), + OpenMode::CreateIfMissing, + &options.storage.rocksdb, + ) + .await?; - processor.run(networking, bifrost).await + if role == Role::Leader { + Self::claim_leadership( + &mut bifrost, + metadata_store_client, + partition_id, + partition_range, + node_id, + ) + .await?; + } + + processor.run(networking, bifrost, partition_store).await + } }, ) } @@ -150,7 +163,6 @@ impl PartitionProcessorManager { options.num_timers_in_memory_limit(), options.internal_queue_length(), self.invoker_handle.clone(), - self.rocksdb_storage.clone(), ) }