Skip to content

Commit

Permalink
Introducing per-partition PartitionStore
Browse files Browse the repository at this point in the history
**IMPORTANT:** This breaks queries through datafusion until we workout how data fusion will shard queries across partitions.
  • Loading branch information
AhmedSoliman committed Apr 29, 2024
1 parent 05eb4ba commit 08fb8f4
Show file tree
Hide file tree
Showing 42 changed files with 1,088 additions and 845 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ prost-build = "0.12.1"
prost-types = "0.12.1"
rand = "0.8.5"
rayon = { version = "1.10" }
rocksdb = { version = "0.22.0", features = ["multi-threaded-cf"], git = "/~https://github.com/restatedev/rust-rocksdb", branch="next" }
rocksdb = { version = "0.22.0", features = ["multi-threaded-cf"], git = "/~https://github.com/restatedev/rust-rocksdb", rev="c2181f2b5da6d7bc201dc858433ed9e1c4bba4b7" }
rustls = "0.21.6"
schemars = { version = "0.8", features = ["bytes", "enumset"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
3 changes: 3 additions & 0 deletions crates/rocksdb/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub enum RocksError {
#[error("unknown column family: {0}")]
#[code(unknown)]
UnknownColumnFamily(CfName),
#[error("already open")]
#[code(unknown)]
AlreadyOpen,
#[error(transparent)]
#[code(unknown)]
Other(#[from] rocksdb::Error),
Expand Down
4 changes: 3 additions & 1 deletion crates/rocksdb/src/rock_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;
use rocksdb::perf::MemoryUsageBuilder;
use rocksdb::ColumnFamilyDescriptor;
use rocksdb::MultiThreaded;
use tracing::info;
use tracing::trace;

use crate::BoxedCfMatcher;
Expand Down Expand Up @@ -157,7 +158,7 @@ impl RocksAccess for rocksdb::DB {
cf_patterns: Arc<[(BoxedCfMatcher, BoxedCfOptionUpdater)]>,
) -> Result<(), RocksError> {
let options = prepare_cf_options(&cf_patterns, default_cf_options, &name)?;
Ok(rocksdb::DB::create_cf(self, name.as_str(), &options)?)
Ok(Self::create_cf(self, name.as_str(), &options)?)
}

fn flush_memtables(&self, cfs: &[CfName], wait: bool) -> Result<(), RocksError> {
Expand Down Expand Up @@ -272,6 +273,7 @@ impl RocksAccess for rocksdb::OptimisticTransactionDB<MultiThreaded> {
cf_patterns: Arc<[(BoxedCfMatcher, BoxedCfOptionUpdater)]>,
) -> Result<(), RocksError> {
let options = prepare_cf_options(&cf_patterns, default_cf_options, &name)?;
info!("Opening CF: {}", name);
Ok(Self::create_cf(self, name.as_str(), &options)?)
}

Expand Down
4 changes: 2 additions & 2 deletions crates/storage-query-datafusion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datafusion::prelude::{SessionConfig, SessionContext};
use restate_invoker_api::StatusHandle;
use restate_schema_api::deployment::DeploymentResolver;
use restate_schema_api::service::ServiceMetadataResolver;
use restate_storage_rocksdb::RocksDBStorage;
use restate_storage_rocksdb::PartitionStore;
use restate_types::config::QueryEngineOptions;

use crate::{analyzer, physical_optimizer};
Expand Down Expand Up @@ -86,7 +86,7 @@ impl Default for QueryContext {
impl QueryContext {
pub fn from_options(
options: &QueryEngineOptions,
rocksdb: RocksDBStorage,
rocksdb: PartitionStore,
status: impl StatusHandle + Send + Sync + Debug + Clone + 'static,
schemas: impl DeploymentResolver
+ ServiceMetadataResolver
Expand Down
6 changes: 3 additions & 3 deletions crates/storage-query-datafusion/src/inbox/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ pub use datafusion_expr::UserDefinedLogicalNode;
use futures::{Stream, StreamExt};
use restate_storage_api::inbox_table::{InboxTable, SequenceNumberInboxEntry};
use restate_storage_api::StorageError;
use restate_storage_rocksdb::RocksDBStorage;
use restate_storage_rocksdb::PartitionStore;
use restate_types::identifiers::PartitionKey;
use tokio::sync::mpsc::Sender;

pub(crate) fn register_self(
ctx: &QueryContext,
storage: RocksDBStorage,
storage: PartitionStore,
) -> datafusion::common::Result<()> {
let table = GenericTableProvider::new(InboxBuilder::schema(), Arc::new(InboxScanner(storage)));

Expand All @@ -41,7 +41,7 @@ pub(crate) fn register_self(
}

#[derive(Debug, Clone)]
struct InboxScanner(RocksDBStorage);
struct InboxScanner(PartitionStore);

impl RangeScanner for InboxScanner {
fn scan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStream;
use datafusion::physical_plan::SendableRecordBatchStream;
pub use datafusion_expr::UserDefinedLogicalNode;
use restate_storage_rocksdb::invocation_status_table::OwnedInvocationStatusRow;
use restate_storage_rocksdb::RocksDBStorage;
use restate_storage_rocksdb::PartitionStore;
use restate_types::identifiers::PartitionKey;
use tokio::sync::mpsc::Sender;

pub(crate) fn register_self(
ctx: &QueryContext,
storage: RocksDBStorage,
storage: PartitionStore,
) -> datafusion::common::Result<()> {
let status_table = GenericTableProvider::new(
InvocationStatusBuilder::schema(),
Expand All @@ -42,7 +42,7 @@ pub(crate) fn register_self(
}

#[derive(Debug, Clone)]
struct StatusScanner(RocksDBStorage);
struct StatusScanner(PartitionStore);

impl RangeScanner for StatusScanner {
fn scan(
Expand Down
6 changes: 3 additions & 3 deletions crates/storage-query-datafusion/src/journal/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStream;
use datafusion::physical_plan::SendableRecordBatchStream;
pub use datafusion_expr::UserDefinedLogicalNode;
use restate_storage_rocksdb::journal_table::OwnedJournalRow;
use restate_storage_rocksdb::RocksDBStorage;
use restate_storage_rocksdb::PartitionStore;
use restate_types::identifiers::PartitionKey;
use tokio::sync::mpsc::Sender;

pub(crate) fn register_self(
ctx: &QueryContext,
storage: RocksDBStorage,
storage: PartitionStore,
) -> datafusion::common::Result<()> {
let journal_table =
GenericTableProvider::new(JournalBuilder::schema(), Arc::new(JournalScanner(storage)));
Expand All @@ -40,7 +40,7 @@ pub(crate) fn register_self(
}

#[derive(Debug, Clone)]
struct JournalScanner(RocksDBStorage);
struct JournalScanner(PartitionStore);

impl RangeScanner for JournalScanner {
fn scan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStream;
use datafusion::physical_plan::SendableRecordBatchStream;
pub use datafusion_expr::UserDefinedLogicalNode;
use restate_storage_rocksdb::service_status_table::OwnedVirtualObjectStatusRow;
use restate_storage_rocksdb::RocksDBStorage;
use restate_storage_rocksdb::PartitionStore;
use restate_types::identifiers::PartitionKey;
use tokio::sync::mpsc::Sender;

pub(crate) fn register_self(
ctx: &QueryContext,
storage: RocksDBStorage,
storage: PartitionStore,
) -> datafusion::common::Result<()> {
let status_table = GenericTableProvider::new(
KeyedServiceStatusBuilder::schema(),
Expand All @@ -42,7 +42,7 @@ pub(crate) fn register_self(
}

#[derive(Debug, Clone)]
struct VirtualObjectStatusScanner(RocksDBStorage);
struct VirtualObjectStatusScanner(PartitionStore);

impl RangeScanner for VirtualObjectStatusScanner {
fn scan(
Expand Down
25 changes: 18 additions & 7 deletions crates/storage-query-datafusion/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use restate_schema_api::deployment::mocks::MockDeploymentMetadataRegistry;
use restate_schema_api::deployment::{Deployment, DeploymentResolver};
use restate_schema_api::service::mocks::MockServiceMetadataResolver;
use restate_schema_api::service::{ServiceMetadata, ServiceMetadataResolver};
use restate_storage_rocksdb::RocksDBStorage;
use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager};
use restate_types::arc_util::Constant;
use restate_types::config::{CommonOptions, QueryEngineOptions, WorkerOptions};
use restate_types::identifiers::{DeploymentId, ServiceRevision};
use restate_types::identifiers::{DeploymentId, PartitionKey, ServiceRevision};
use restate_types::invocation::ServiceType;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::RangeInclusive;

#[derive(Default, Clone, Debug)]
pub(crate) struct MockSchemas(
Expand Down Expand Up @@ -74,7 +75,7 @@ impl DeploymentResolver for MockSchemas {
}
}

pub(crate) struct MockQueryEngine(RocksDBStorage, QueryContext);
pub(crate) struct MockQueryEngine(PartitionStore, QueryContext);

impl MockQueryEngine {
pub async fn create_with(
Expand All @@ -92,12 +93,22 @@ impl MockQueryEngine {
RocksDbManager::init(Constant::new(CommonOptions::default()))
});
let worker_options = WorkerOptions::default();
let rocksdb = RocksDBStorage::open(
let manager = PartitionStoreManager::create(
Constant::new(worker_options.storage.clone()),
Constant::new(worker_options.storage.rocksdb),
Constant::new(worker_options.storage.rocksdb.clone()),
&[(0, RangeInclusive::new(0, PartitionKey::MAX))],
)
.await
.expect("RocksDB storage creation should succeed");
.expect("DB creation succeeds");
let rocksdb = manager
.open_partition_store(
0,
RangeInclusive::new(0, PartitionKey::MAX),
OpenMode::CreateIfMissing,
&worker_options.storage.rocksdb,
)
.await
.expect("column family is open");

Self(
rocksdb.clone(),
Expand All @@ -110,7 +121,7 @@ impl MockQueryEngine {
Self::create_with(MockStatusHandle::default(), MockSchemas::default()).await
}

pub fn rocksdb_mut(&mut self) -> &mut RocksDBStorage {
pub fn rocksdb_mut(&mut self) -> &mut PartitionStore {
&mut self.0
}

Expand Down
6 changes: 3 additions & 3 deletions crates/storage-query-datafusion/src/state/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStream;
use datafusion::physical_plan::SendableRecordBatchStream;
pub use datafusion_expr::UserDefinedLogicalNode;
use restate_storage_rocksdb::state_table::OwnedStateRow;
use restate_storage_rocksdb::RocksDBStorage;
use restate_storage_rocksdb::PartitionStore;
use restate_types::identifiers::PartitionKey;
use tokio::sync::mpsc::Sender;

pub(crate) fn register_self(
ctx: &QueryContext,
storage: RocksDBStorage,
storage: PartitionStore,
) -> datafusion::common::Result<()> {
let table = GenericTableProvider::new(StateBuilder::schema(), Arc::new(StateScanner(storage)));

Expand All @@ -39,7 +39,7 @@ pub(crate) fn register_self(
}

#[derive(Debug, Clone)]
struct StateScanner(RocksDBStorage);
struct StateScanner(PartitionStore);

impl RangeScanner for StateScanner {
fn scan(
Expand Down
4 changes: 3 additions & 1 deletion crates/storage-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ codederror = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
drain = { workspace = true }
enum-map = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
once_cell = "1.18.0"
once_cell = { workspace = true }
paste = { workspace = true }
prost = { workspace = true }
rocksdb = { workspace = true }
Expand All @@ -40,6 +41,7 @@ sync_wrapper = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
restate-core = { workspace = true, features = ["test-util"] }
Expand Down
43 changes: 29 additions & 14 deletions crates/storage-rocksdb/benches/basic_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,22 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::ops::RangeInclusive;

use criterion::{criterion_group, criterion_main, Criterion};
use restate_core::TaskCenterBuilder;
use restate_rocksdb::RocksDbManager;
use restate_storage_api::deduplication_table::{
DedupSequenceNumber, DeduplicationTable, ProducerId,
};
use restate_storage_api::Transaction;
use restate_storage_rocksdb::RocksDBStorage;
use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager};
use restate_types::arc_util::Constant;
use restate_types::config::{CommonOptions, WorkerOptions};
use restate_types::identifiers::PartitionKey;
use tokio::runtime::Builder;

async fn writing_to_rocksdb(worker_options: WorkerOptions) {
//
// setup
//
let mut rocksdb = RocksDBStorage::open(
Constant::new(worker_options.storage.clone()),
Constant::new(worker_options.storage.rocksdb),
)
.await
.expect("RocksDB storage creation should succeed");

async fn writing_to_rocksdb(mut rocksdb: PartitionStore) {
//
// write
//
Expand All @@ -52,16 +45,38 @@ fn basic_writing_reading_benchmark(c: &mut Criterion) {
.build()
.expect("task_center builds");

let worker_options = WorkerOptions::default();
tc.run_in_scope_sync("db-manager-init", None, || {
RocksDbManager::init(Constant::new(CommonOptions::default()))
});
let rocksdb = tc.block_on("test-setup", None, async {
//
// setup
//
let manager = PartitionStoreManager::create(
Constant::new(worker_options.storage.clone()),
Constant::new(worker_options.storage.rocksdb.clone()),
&[(0, RangeInclusive::new(0, PartitionKey::MAX))],
)
.await
.expect("DB creation succeeds");
manager
.open_partition_store(
0,
RangeInclusive::new(0, PartitionKey::MAX),
OpenMode::CreateIfMissing,
&worker_options.storage.rocksdb,
)
.await
.expect("column family is open")
});

let mut group = c.benchmark_group("RocksDB");
group.sample_size(10).bench_function("writing", |bencher| {
// This will generate a temp dir since we have test-util feature enabled
let worker_options = WorkerOptions::default();
bencher
.to_async(&rt)
.iter(|| writing_to_rocksdb(worker_options.clone()));
.iter(|| writing_to_rocksdb(rocksdb.clone()));
});

group.finish();
Expand Down
4 changes: 2 additions & 2 deletions crates/storage-rocksdb/src/deduplication_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use crate::keys::{define_table_key, KeyKind, TableKey};
use crate::TableKind::Deduplication;
use crate::{
RocksDBStorage, RocksDBTransaction, StorageAccess, TableScan, TableScanIterationDecision,
PartitionStore, RocksDBTransaction, StorageAccess, TableScan, TableScanIterationDecision,
};
use futures::Stream;
use futures_util::stream;
Expand Down Expand Up @@ -67,7 +67,7 @@ fn get_all_sequence_numbers<S: StorageAccess>(
))
}

impl ReadOnlyDeduplicationTable for RocksDBStorage {
impl ReadOnlyDeduplicationTable for PartitionStore {
async fn get_dedup_sequence_number(
&mut self,
partition_id: PartitionId,
Expand Down
Loading

0 comments on commit 08fb8f4

Please sign in to comment.