diff --git a/Cargo.lock b/Cargo.lock index 8877da3fe..dc3cc89f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6110,7 +6110,6 @@ dependencies = [ "futures", "futures-util", "googletest", - "log", "num-bigint", "once_cell", "paste", @@ -6125,6 +6124,7 @@ dependencies = [ "rocksdb", "schemars", "serde", + "static_assertions", "strum 0.26.2", "strum_macros 0.26.2", "sync_wrapper", diff --git a/crates/node/src/cluster_marker.rs b/crates/node/src/cluster_marker.rs index 1a1789662..26e2951a1 100644 --- a/crates/node/src/cluster_marker.rs +++ b/crates/node/src/cluster_marker.rs @@ -210,6 +210,7 @@ mod tests { let mut file = OpenOptions::new().create(true).write(true).open(path)?; serde_json::to_writer(&file, cluster_marker)?; file.flush()?; + file.sync_all()?; Ok(()) } @@ -253,8 +254,8 @@ mod tests { } #[test] - fn cluster_marker_is_updated() { - let file = NamedTempFile::new().unwrap(); + fn cluster_marker_is_updated() -> anyhow::Result<()> { + let mut file = NamedTempFile::new().unwrap(); let previous_version = Version::new(1, 1, 6); let current_version = Version::new(1, 2, 3); @@ -263,6 +264,7 @@ mod tests { file.path(), ) .unwrap(); + file.flush()?; validate_and_update_cluster_marker_inner( CLUSTER_NAME, @@ -281,12 +283,13 @@ mod tests { max_version: current_version, cluster_name: CLUSTER_NAME.to_owned(), } - ) + ); + Ok(()) } #[test] - fn max_version_is_maintained() { - let file = NamedTempFile::new().unwrap(); + fn max_version_is_maintained() -> anyhow::Result<()> { + let mut file = NamedTempFile::new().unwrap(); let max_version = Version::new(1, 2, 6); let current_version = Version::new(1, 1, 3); @@ -295,6 +298,7 @@ mod tests { file.path(), ) .unwrap(); + file.flush()?; validate_and_update_cluster_marker_inner( CLUSTER_NAME, @@ -313,12 +317,13 @@ mod tests { max_version, cluster_name: CLUSTER_NAME.to_owned(), } - ) + ); + Ok(()) } #[test] - fn incompatible_cluster_name() { - let file = NamedTempFile::new().unwrap(); + fn incompatible_cluster_name() -> anyhow::Result<()> { + let mut file = NamedTempFile::new().unwrap(); let max_version = Version::new(1, 2, 6); let current_version = Version::new(1, 1, 3); @@ -327,6 +332,7 @@ mod tests { file.path(), ) .unwrap(); + file.flush()?; let result = validate_and_update_cluster_marker_inner( CLUSTER_NAME, @@ -337,12 +343,13 @@ mod tests { assert!(matches!( result, Err(ClusterValidationError::IncorrectClusterName { .. }) - )) + )); + Ok(()) } #[test] - fn incompatible_version() { - let file = NamedTempFile::new().unwrap(); + fn incompatible_version() -> anyhow::Result<()> { + let mut file = NamedTempFile::new().unwrap(); let max_version = Version::new(1, 2, 6); let compatibility_boundary = Version::new(1, 1, 1); let current_version = Version::new(1, 0, 3); @@ -352,6 +359,7 @@ mod tests { file.path(), ) .unwrap(); + file.flush()?; let mut compatibility_map = COMPATIBILITY_MAP.deref().clone(); compatibility_map.insert( @@ -368,6 +376,7 @@ mod tests { assert!(matches!( result, Err(ClusterValidationError::IncompatibleVersion { .. }) - )) + )); + Ok(()) } } diff --git a/crates/storage-rocksdb/Cargo.toml b/crates/storage-rocksdb/Cargo.toml index 490ae54ec..425aeeb0f 100644 --- a/crates/storage-rocksdb/Cargo.toml +++ b/crates/storage-rocksdb/Cargo.toml @@ -27,19 +27,19 @@ derive_more = { workspace = true } drain = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } +once_cell = "1.18.0" paste = { workspace = true } prost = { workspace = true } rocksdb = { workspace = true } schemars = { workspace = true, optional = true } serde = { workspace = true } +static_assertions = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } sync_wrapper = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } -once_cell = "1.18.0" -log = "0.4.20" [dev-dependencies] restate-core = { workspace = true, features = ["test-util"] } diff --git a/crates/storage-rocksdb/src/deduplication_table/mod.rs b/crates/storage-rocksdb/src/deduplication_table/mod.rs index 5393af312..c5988df86 100644 --- a/crates/storage-rocksdb/src/deduplication_table/mod.rs +++ b/crates/storage-rocksdb/src/deduplication_table/mod.rs @@ -47,7 +47,7 @@ fn get_all_sequence_numbers( partition_id: PartitionId, ) -> impl Stream> + Send { stream::iter(storage.for_each_key_value_in_place( - TableScan::Partition::(partition_id), + TableScan::SinglePartition::(partition_id), move |k, mut v| { let key = DeduplicationKey::deserialize_from(&mut Cursor::new(k)).map(|key| key.producer_id); diff --git a/crates/storage-rocksdb/src/idempotency_table/mod.rs b/crates/storage-rocksdb/src/idempotency_table/mod.rs index 18c3e2513..fcf996f62 100644 --- a/crates/storage-rocksdb/src/idempotency_table/mod.rs +++ b/crates/storage-rocksdb/src/idempotency_table/mod.rs @@ -63,7 +63,9 @@ fn all_idempotency_metadata( storage: &mut S, range: RangeInclusive, ) -> impl Stream> + Send + '_ { - let iter = storage.iterator_from(TableScan::PartitionKeyRange::(range)); + let iter = storage.iterator_from(TableScan::FullScanPartitionKeyRange::( + range, + )); stream::iter(OwnedIterator::new(iter).map(|(mut k, mut v)| { let key = IdempotencyKey::deserialize_from(&mut k)?; let idempotency_metadata = StorageCodec::decode::(&mut v) diff --git a/crates/storage-rocksdb/src/inbox_table/mod.rs b/crates/storage-rocksdb/src/inbox_table/mod.rs index 2c9fbb77a..f76cf277c 100644 --- a/crates/storage-rocksdb/src/inbox_table/mod.rs +++ b/crates/storage-rocksdb/src/inbox_table/mod.rs @@ -70,13 +70,16 @@ impl<'a> InboxTable for RocksDBTransaction<'a> { .service_name(service_id.service_name.clone()) .service_key(service_id.key.clone()); - self.get_first_blocking(TableScan::KeyPrefix(key), |kv| match kv { - Some((k, v)) => { - let entry = decode_inbox_key_value(k, v)?; - Ok(Some(entry)) - } - None => Ok(None), - }) + self.get_first_blocking( + TableScan::SinglePartitionKeyPrefix(service_id.partition_key(), key), + |kv| match kv { + Some((k, v)) => { + let entry = decode_inbox_key_value(k, v)?; + Ok(Some(entry)) + } + None => Ok(None), + }, + ) } async fn pop_inbox( @@ -102,12 +105,13 @@ impl<'a> InboxTable for RocksDBTransaction<'a> { .service_name(service_id.service_name.clone()) .service_key(service_id.key.clone()); - stream::iter( - self.for_each_key_value_in_place(TableScan::KeyPrefix(key), |k, v| { + stream::iter(self.for_each_key_value_in_place( + TableScan::SinglePartitionKeyPrefix(service_id.partition_key(), key), + |k, v| { let inbox_entry = decode_inbox_key_value(k, v); TableScanIterationDecision::Emit(inbox_entry) - }), - ) + }, + )) } fn all_inboxes( @@ -115,7 +119,7 @@ impl<'a> InboxTable for RocksDBTransaction<'a> { range: RangeInclusive, ) -> impl Stream> + Send { stream::iter(self.for_each_key_value_in_place( - TableScan::PartitionKeyRange::(range), + TableScan::FullScanPartitionKeyRange::(range), |k, v| { let inbox_entry = decode_inbox_key_value(k, v); TableScanIterationDecision::Emit(inbox_entry) diff --git a/crates/storage-rocksdb/src/invocation_status_table/mod.rs b/crates/storage-rocksdb/src/invocation_status_table/mod.rs index d0cbecd71..0c6b9675d 100644 --- a/crates/storage-rocksdb/src/invocation_status_table/mod.rs +++ b/crates/storage-rocksdb/src/invocation_status_table/mod.rs @@ -10,7 +10,7 @@ use crate::keys::{define_table_key, KeyKind, TableKey}; use crate::owned_iter::OwnedIterator; -use crate::TableScan::PartitionKeyRange; +use crate::TableScan::FullScanPartitionKeyRange; use crate::{RocksDBStorage, TableKind, TableScanIterationDecision}; use crate::{RocksDBTransaction, StorageAccess}; use futures::Stream; @@ -87,7 +87,7 @@ fn invoked_invocations( partition_key_range: RangeInclusive, ) -> Vec> { storage.for_each_key_value_in_place( - PartitionKeyRange::(partition_key_range), + FullScanPartitionKeyRange::(partition_key_range), |mut k, mut v| { let result = read_invoked_full_invocation_id(&mut k, &mut v).transpose(); if let Some(res) = result { @@ -172,7 +172,7 @@ impl RocksDBStorage { &self, range: RangeInclusive, ) -> impl Iterator + '_ { - let iter = self.iterator_from(PartitionKeyRange::(range)); + let iter = self.iterator_from(FullScanPartitionKeyRange::(range)); OwnedIterator::new(iter).map(|(mut key, mut value)| { let state_key = InvocationStatusKey::deserialize_from(&mut key).unwrap(); let state_value = StorageCodec::decode::(&mut value).unwrap(); diff --git a/crates/storage-rocksdb/src/journal_table/mod.rs b/crates/storage-rocksdb/src/journal_table/mod.rs index b1179c89e..1beef0f50 100644 --- a/crates/storage-rocksdb/src/journal_table/mod.rs +++ b/crates/storage-rocksdb/src/journal_table/mod.rs @@ -11,7 +11,7 @@ use crate::keys::TableKey; use crate::keys::{define_table_key, KeyKind}; use crate::owned_iter::OwnedIterator; -use crate::scan::TableScan::PartitionKeyRange; +use crate::scan::TableScan::FullScanPartitionKeyRange; use crate::TableKind::Journal; use crate::{RocksDBStorage, RocksDBTransaction, StorageAccess}; use crate::{TableScan, TableScanIterationDecision}; @@ -74,24 +74,27 @@ fn get_journal( .invocation_uuid(invocation_id.invocation_uuid()); let mut n = 0; - storage.for_each_key_value_in_place(TableScan::KeyPrefix(key), move |k, mut v| { - let key = JournalKey::deserialize_from(&mut Cursor::new(k)).map(|journal_key| { - journal_key - .journal_index - .expect("The journal index must be part of the journal key.") - }); - let entry = StorageCodec::decode::(&mut v) - .map_err(|error| StorageError::Generic(error.into())); - - let result = key.and_then(|key| entry.map(|entry| (key, entry))); - - n += 1; - if n < journal_length { - TableScanIterationDecision::Emit(result) - } else { - TableScanIterationDecision::BreakWith(result) - } - }) + storage.for_each_key_value_in_place( + TableScan::SinglePartitionKeyPrefix(invocation_id.partition_key(), key), + move |k, mut v| { + let key = JournalKey::deserialize_from(&mut Cursor::new(k)).map(|journal_key| { + journal_key + .journal_index + .expect("The journal index must be part of the journal key.") + }); + let entry = StorageCodec::decode::(&mut v) + .map_err(|error| StorageError::Generic(error.into())); + + let result = key.and_then(|key| entry.map(|entry| (key, entry))); + + n += 1; + if n < journal_length { + TableScanIterationDecision::Emit(result) + } else { + TableScanIterationDecision::BreakWith(result) + } + }, + ) } fn delete_journal( @@ -170,7 +173,7 @@ impl RocksDBStorage { &self, range: RangeInclusive, ) -> impl Iterator + '_ { - let iter = self.iterator_from(PartitionKeyRange::(range)); + let iter = self.iterator_from(FullScanPartitionKeyRange::(range)); OwnedIterator::new(iter).map(|(mut key, mut value)| { let journal_key = JournalKey::deserialize_from(&mut key) .expect("journal key must deserialize into JournalKey"); diff --git a/crates/storage-rocksdb/src/keys.rs b/crates/storage-rocksdb/src/keys.rs index 4a2314775..021a3f931 100644 --- a/crates/storage-rocksdb/src/keys.rs +++ b/crates/storage-rocksdb/src/keys.rs @@ -20,7 +20,9 @@ use strum_macros::EnumIter; /// # Important /// There must exist a bijective mapping between the enum variant and its byte representation. /// See [`KeyKind::as_bytes`] and [`KeyKind::from_bytes`]. -#[derive(Debug, Copy, Clone, Eq, PartialEq, EnumIter, derive_more::Display)] +#[derive( + Debug, Copy, Clone, Eq, PartialEq, EnumIter, derive_more::Display, strum_macros::VariantArray, +)] pub enum KeyKind { Deduplication, Fsm, @@ -37,6 +39,17 @@ pub enum KeyKind { impl KeyKind { pub const SERIALIZED_LENGTH: usize = 2; + pub const fn exclusive_upper_bound(&self) -> [u8; Self::SERIALIZED_LENGTH] { + let start = self.as_bytes(); + let num = u16::from_be_bytes(*start); + let num = if num == u16::MAX { + panic!("key kind to not saturate u16"); + } else { + num + 1 + }; + num.to_be_bytes() + } + /// A once assigned byte representation to a key kind variant must never be changed! Instead, /// create a new variant representing a new key. /// @@ -45,7 +58,9 @@ impl KeyKind { /// ```ignore /// KeyKind::from_bytes(key_kind.as_bytes()) == key_kind /// ``` - fn as_bytes(&self) -> &[u8; Self::SERIALIZED_LENGTH] { + pub const fn as_bytes(&self) -> &'static [u8; Self::SERIALIZED_LENGTH] { + // NOTE: do not use &[0xff, 0xff] as key byte prefix, ever! + // We should always be able to +1 the those bytes when interpreted as u16 match self { KeyKind::Deduplication => b"de", KeyKind::Fsm => b"fs", @@ -68,7 +83,7 @@ impl KeyKind { /// ```ignore /// KeyKind::from_bytes(key_kind.as_bytes()) == key_kind /// ``` - fn from_bytes(bytes: &[u8; Self::SERIALIZED_LENGTH]) -> Option { + pub const fn from_bytes(bytes: &[u8; Self::SERIALIZED_LENGTH]) -> Option { match bytes { b"de" => Some(KeyKind::Deduplication), b"fs" => Some(KeyKind::Fsm), @@ -101,12 +116,13 @@ impl KeyKind { } } -pub trait TableKey: Sized + Send + 'static { +pub trait TableKey: Sized + std::fmt::Debug + Send + 'static { + const TABLE: TableKind; + const KEY_KIND: KeyKind; fn is_complete(&self) -> bool; fn serialize_key_kind(bytes: &mut B); fn serialize_to(&self, bytes: &mut B); fn deserialize_from(bytes: &mut B) -> crate::Result; - fn table() -> TableKind; fn serialize(&self) -> BytesMut { let mut buf = BytesMut::with_capacity(self.serialized_length()); @@ -115,7 +131,6 @@ pub trait TableKey: Sized + Send + 'static { } fn serialized_length(&self) -> usize; - fn serialized_key_kind_length() -> usize; } /// The following macro defines an ordered, named key tuple, that is used as a rocksdb key. @@ -199,10 +214,6 @@ pub trait TableKey: Sized + Send + 'static { /// return Ok(this); /// } /// -/// fn serialized_key_kind_length() -> usize { -/// KeyKind::SERIALIZED_LENGTH -/// } -/// /// fn table() -> TableKind { /// FooBarTable /// } @@ -236,11 +247,8 @@ macro_rules! define_table_key { // serde impl crate::keys::TableKey for $key_name { - - #[inline] - fn table() -> crate::TableKind { - $table_kind - } + const TABLE: crate::TableKind = $table_kind; + const KEY_KIND: $crate::keys::KeyKind = $key_kind; fn is_complete(&self) -> bool { $( @@ -284,17 +292,12 @@ macro_rules! define_table_key { #[inline] fn serialized_length(&self) -> usize { // we always need space for the key kind - let mut serialized_length = Self::serialized_key_kind_length(); + let mut serialized_length = $crate::keys::KeyKind::SERIALIZED_LENGTH; $( serialized_length += $crate::keys::KeyCodec::serialized_length(&self.$element); )+ serialized_length } - - #[inline] - fn serialized_key_kind_length() -> usize { - $crate::keys::KeyKind::SERIALIZED_LENGTH - } } }) } diff --git a/crates/storage-rocksdb/src/lib.rs b/crates/storage-rocksdb/src/lib.rs index 92b21f4ab..11cd48fbd 100644 --- a/crates/storage-rocksdb/src/lib.rs +++ b/crates/storage-rocksdb/src/lib.rs @@ -28,8 +28,20 @@ use crate::TableKind::{ Deduplication, Idempotency, Inbox, InvocationStatus, Journal, Outbox, PartitionStateMachine, ServiceStatus, State, Timers, }; -use bytes::BytesMut; + +use std::sync::Arc; + +use bytes::{Bytes, BytesMut}; use codederror::CodedError; +use rocksdb::DBCompressionType; +use rocksdb::DBPinnableSlice; +use rocksdb::DBRawIteratorWithThreadMode; +use rocksdb::MultiThreaded; +use rocksdb::PrefixRange; +use rocksdb::ReadOptions; +use rocksdb::{BoundColumnFamily, SliceTransform}; +use static_assertions::const_assert_eq; + use restate_core::ShutdownError; use restate_rocksdb::{ CfName, CfPrefixPattern, DbName, DbSpecBuilder, Owner, RocksDbManager, RocksError, @@ -37,15 +49,10 @@ use restate_rocksdb::{ use restate_storage_api::{Storage, StorageError, Transaction}; use restate_types::arc_util::Updateable; use restate_types::config::{RocksDbOptions, StorageOptions}; +use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; -use rocksdb::BoundColumnFamily; -use rocksdb::DBCompressionType; -use rocksdb::DBPinnableSlice; -use rocksdb::DBRawIteratorWithThreadMode; -use rocksdb::MultiThreaded; -use rocksdb::PrefixRange; -use rocksdb::ReadOptions; -use std::sync::Arc; + +use self::keys::KeyKind; pub type DB = rocksdb::OptimisticTransactionDB; type TransactionDB<'a> = rocksdb::Transaction<'a, DB>; @@ -56,16 +63,20 @@ pub type DBIteratorTransaction<'b> = DBRawIteratorWithThreadMode<'b, rocksdb::Tr // matches the default directory name const DB_NAME: &str = "db"; -const STATE_TABLE_NAME: &str = "state"; -const INVOCATION_STATUS_TABLE_NAME: &str = "invocation_status"; -const SERVICE_STATUS_TABLE_NAME: &str = "service_status"; -const IDEMPOTENCY_TABLE_NAME: &str = "idempotency"; -const INBOX_TABLE_NAME: &str = "inbox"; -const OUTBOX_TABLE_NAME: &str = "outbox"; -const DEDUP_TABLE_NAME: &str = "dedup"; -const FSM_TABLE_NAME: &str = "fsm"; -const TIMERS_TABLE_NAME: &str = "timers"; -const JOURNAL_TABLE_NAME: &str = "journal"; +pub const PARTITION_CF: &str = "data-unpartitioned"; + +//Key prefix is 10 bytes (KeyKind(2) + PartitionKey/Id(8)) +const DB_PREFIX_LENGTH: usize = KeyKind::SERIALIZED_LENGTH + std::mem::size_of::(); + +// If this changes, we need to know. +const_assert_eq!(DB_PREFIX_LENGTH, 10); + +// Ensures that both types have the same length, this makes it possible to +// share prefix extractor in rocksdb. +const_assert_eq!( + std::mem::size_of::(), + std::mem::size_of::(), +); pub(crate) type Result = std::result::Result; @@ -77,40 +88,27 @@ pub enum TableScanIterationDecision { } #[inline] -const fn cf_name(kind: TableKind) -> &'static str { - match kind { - State => STATE_TABLE_NAME, - InvocationStatus => INVOCATION_STATUS_TABLE_NAME, - ServiceStatus => SERVICE_STATUS_TABLE_NAME, - Inbox => INBOX_TABLE_NAME, - Outbox => OUTBOX_TABLE_NAME, - Deduplication => DEDUP_TABLE_NAME, - PartitionStateMachine => FSM_TABLE_NAME, - Timers => TIMERS_TABLE_NAME, - Journal => JOURNAL_TABLE_NAME, - Idempotency => IDEMPOTENCY_TABLE_NAME, - } +const fn cf_name(_kind: TableKind) -> &'static str { + PARTITION_CF } #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum TableKind { + // By Partition ID + PartitionStateMachine, + Deduplication, + Outbox, + Timers, + // By Partition Key State, InvocationStatus, ServiceStatus, Idempotency, Inbox, - Outbox, - Deduplication, - PartitionStateMachine, - Timers, Journal, } impl TableKind { - pub const fn cf_name(&self) -> &'static str { - cf_name(*self) - } - pub fn all() -> core::slice::Iter<'static, TableKind> { static VARIANTS: &[TableKind] = &[ State, @@ -126,6 +124,37 @@ impl TableKind { ]; VARIANTS.iter() } + + pub const fn key_kinds(self) -> &'static [KeyKind] { + match self { + State => &[KeyKind::State], + InvocationStatus => &[KeyKind::InvocationStatus], + ServiceStatus => &[KeyKind::ServiceStatus], + Idempotency => &[KeyKind::Idempotency], + Inbox => &[KeyKind::Inbox], + Outbox => &[KeyKind::Outbox], + Deduplication => &[KeyKind::Deduplication], + PartitionStateMachine => &[KeyKind::Fsm], + Timers => &[KeyKind::Timers], + Journal => &[KeyKind::Journal], + } + } + + pub fn has_key_kind(self, prefix: &[u8]) -> bool { + self.extract_key_kind(prefix).is_some() + } + + pub fn extract_key_kind(self, prefix: &[u8]) -> Option { + if prefix.len() < KeyKind::SERIALIZED_LENGTH { + return None; + } + let slice = prefix[..KeyKind::SERIALIZED_LENGTH].try_into().unwrap(); + let Some(kind) = KeyKind::from_bytes(slice) else { + // warning + return None; + }; + self.key_kinds().iter().find(|k| **k == kind).copied() + } } #[derive(Debug, thiserror::Error, CodedError)] @@ -189,6 +218,10 @@ fn db_options() -> rocksdb::Options { } fn cf_options(mut cf_options: rocksdb::Options) -> rocksdb::Options { + // Actually, we would love to use CappedPrefixExtractor but unfortunately it's neither exposed + // in the C API nor the rust binding. That's okay and we can change it later. + cf_options.set_prefix_extractor(SliceTransform::create_fixed_prefix(DB_PREFIX_LENGTH)); + cf_options.set_memtable_prefix_bloom_ratio(0.2); // Most of the changes are highly temporal, we try to delay flushing // As much as we can to increase the chances to observe a deletion. // @@ -222,26 +255,7 @@ impl RocksDBStorage { mut storage_opts: impl Updateable + Send + 'static, updateable_opts: impl Updateable + Send + 'static, ) -> std::result::Result { - let cfs = vec![ - // - // keyed by partition key + user key - // - CfName::new(cf_name(Inbox)), - CfName::new(cf_name(State)), - CfName::new(cf_name(InvocationStatus)), - CfName::new(cf_name(ServiceStatus)), - CfName::new(cf_name(Journal)), - CfName::new(cf_name(Idempotency)), - // - // keyed by partition id + suffix - // - CfName::new(cf_name(Outbox)), - CfName::new(cf_name(Timers)), - // keyed by partition_id + partition_id - CfName::new(cf_name(Deduplication)), - // keyed by partition_id + u64 - CfName::new(cf_name(PartitionStateMachine)), - ]; + let cfs = vec![CfName::new(PARTITION_CF)]; let options = storage_opts.load(); let db_spec = DbSpecBuilder::new( @@ -275,42 +289,75 @@ impl RocksDBStorage { ) } - fn prefix_iterator>>(&self, table: TableKind, prefix: K) -> DBIterator { + fn prefix_iterator(&self, table: TableKind, _key_kind: KeyKind, prefix: Bytes) -> DBIterator { let table = self.table_handle(table); let mut opts = ReadOptions::default(); - - opts.set_iterate_range(PrefixRange(prefix)); - - self.db.raw_iterator_cf_opt(&table, opts) + opts.set_prefix_same_as_start(true); + opts.set_iterate_range(PrefixRange(prefix.clone())); + opts.set_async_io(true); + opts.set_total_order_seek(false); + let mut it = self.db.raw_iterator_cf_opt(&table, opts); + it.seek(prefix); + it } - fn range_iterator(&self, table: TableKind, range: impl rocksdb::IterateBounds) -> DBIterator { + fn range_iterator( + &self, + table: TableKind, + _key: KeyKind, + scan_mode: ScanMode, + from: Bytes, + to: Bytes, + ) -> DBIterator { let table = self.table_handle(table); let mut opts = ReadOptions::default(); - opts.set_iterate_range(range); - self.db.raw_iterator_cf_opt(&table, opts) + // todo: use auto_prefix_mode, at the moment, rocksdb doesn't expose this through the C + // binding. + opts.set_total_order_seek(scan_mode == ScanMode::TotalOrder); + opts.set_iterate_range(from.clone()..to); + opts.set_async_io(true); + + let mut it = self.db.raw_iterator_cf_opt(&table, opts); + it.seek(from); + it } + #[track_caller] fn iterator_from( &self, scan: TableScan, ) -> DBRawIteratorWithThreadMode<'_, DB> { let scan: PhysicalScan = scan.into(); match scan { - PhysicalScan::Prefix(table, prefix) => { - let mut it = self.prefix_iterator(table, prefix.clone()); - it.seek(prefix); - it + PhysicalScan::Prefix(table, key_kind, prefix) => { + assert!(table.has_key_kind(&prefix)); + self.prefix_iterator(table, key_kind, prefix.freeze()) } - PhysicalScan::RangeExclusive(table, start, end) => { - let mut it = self.range_iterator(table, start.clone()..end); - it.seek(start); - it + PhysicalScan::RangeExclusive(table, key_kind, scan_mode, start, end) => { + assert!(table.has_key_kind(&start)); + self.range_iterator(table, key_kind, scan_mode, start.freeze(), end.freeze()) } - PhysicalScan::RangeOpen(table, start) => { - let mut it = self.range_iterator(table, start.clone()..); - it.seek(start); - it + PhysicalScan::RangeOpen(table, key_kind, start) => { + // We delayed the generate the synthetic iterator upper bound until this point + // because we might have different prefix length requirements based on the + // table+key_kind combination and we should keep this knowledge as low-level as + // possible. + // + // make the end has the same length as all prefixes to ensure rocksdb key + // comparator can leverage bloom filters when applicable + // (if auto_prefix_mode is enabled) + let mut end = BytesMut::zeroed(DB_PREFIX_LENGTH); + // We want to ensure that Range scans fall within the same key kind. + // So, we limit the iterator to the upper bound of this prefix + let kind_upper_bound = K::KEY_KIND.exclusive_upper_bound(); + end[..kind_upper_bound.len()].copy_from_slice(&kind_upper_bound); + self.range_iterator( + table, + key_kind, + ScanMode::TotalOrder, + start.freeze(), + end.freeze(), + ) } } } @@ -390,31 +437,56 @@ pub struct RocksDBTransaction<'a> { value_buffer: &'a mut BytesMut, } +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum ScanMode { + /// Scan is bound to a single fixed key prefix (partition id, or a single partition key). + WithinPrefix, + /// Scan/iterator requires total order seek, this means that the iterator is not bound to a + /// fixed prefix that matches the column family prefix extractor length. For instance, if + /// scanning data across multiple partition IDs or multiple partition keys. + TotalOrder, +} + impl<'a> RocksDBTransaction<'a> { - pub fn prefix_iterator>>( + pub(crate) fn prefix_iterator( &self, table: TableKind, - prefix: K, + _key_kind: KeyKind, + prefix: Bytes, ) -> DBIteratorTransaction { let table = self.table_handle(table); let mut opts = ReadOptions::default(); - opts.set_iterate_range(PrefixRange(prefix)); + opts.set_iterate_range(PrefixRange(prefix.clone())); + opts.set_prefix_same_as_start(true); + opts.set_async_io(true); + opts.set_total_order_seek(false); - self.txn.raw_iterator_cf_opt(&table, opts) + let mut it = self.txn.raw_iterator_cf_opt(&table, opts); + it.seek(prefix); + it } - pub fn range_iterator( + pub(crate) fn range_iterator( &self, table: TableKind, - range: impl rocksdb::IterateBounds, + _key_kind: KeyKind, + scan_mode: ScanMode, + from: Bytes, + to: Bytes, ) -> DBIteratorTransaction { let table = self.table_handle(table); let mut opts = ReadOptions::default(); - opts.set_iterate_range(range); - self.txn.raw_iterator_cf_opt(&table, opts) + // todo: use auto_prefix_mode, at the moment, rocksdb doesn't expose this through the C + // binding. + opts.set_total_order_seek(scan_mode == ScanMode::TotalOrder); + opts.set_iterate_range(from.clone()..to); + opts.set_async_io(true); + let mut it = self.txn.raw_iterator_cf_opt(&table, opts); + it.seek(from); + it } - fn table_handle(&self, table_kind: TableKind) -> Arc { + pub(crate) fn table_handle(&self, table_kind: TableKind) -> Arc { self.db.cf_handle(cf_name(table_kind)).expect( "This should not happen, this is a Restate bug. Please contact the restate developers.", ) @@ -449,20 +521,33 @@ impl<'a> StorageAccess for RocksDBTransaction<'a> { ) -> DBRawIteratorWithThreadMode<'_, Self::DBAccess<'_>> { let scan: PhysicalScan = scan.into(); match scan { - PhysicalScan::Prefix(table, prefix) => { - let mut it = self.prefix_iterator(table, prefix.clone()); - it.seek(prefix); - it + PhysicalScan::Prefix(table, key_kind, prefix) => { + self.prefix_iterator(table, key_kind, prefix.freeze()) } - PhysicalScan::RangeExclusive(table, start, end) => { - let mut it = self.range_iterator(table, start.clone()..end); - it.seek(start); - it + PhysicalScan::RangeExclusive(table, key_kind, scan_mode, start, end) => { + self.range_iterator(table, key_kind, scan_mode, start.freeze(), end.freeze()) } - PhysicalScan::RangeOpen(table, start) => { - let mut it = self.range_iterator(table, start.clone()..); - it.seek(start); - it + PhysicalScan::RangeOpen(table, key_kind, start) => { + // We delayed the generate the synthetic iterator upper bound until this point + // because we might have different prefix length requirements based on the + // table+key_kind combination and we should keep this knowledge as low-level as + // possible. + // + // make the end has the same length as all prefixes to ensure rocksdb key + // comparator can leverage bloom filters when applicable + // (if auto_prefix_mode is enabled) + let mut end = BytesMut::zeroed(DB_PREFIX_LENGTH); + // We want to ensure that Range scans fall within the same key kind. + // So, we limit the iterator to the upper bound of this prefix + let kind_upper_bound = K::KEY_KIND.exclusive_upper_bound(); + end[..kind_upper_bound.len()].copy_from_slice(&kind_upper_bound); + self.range_iterator( + table, + key_kind, + ScanMode::WithinPrefix, + start.freeze(), + end.freeze(), + ) } } } @@ -528,7 +613,7 @@ trait StorageAccess { key.serialize_to(key_buffer); let key_buffer = key_buffer.split(); - self.put_cf(K::table(), key_buffer, value); + self.put_cf(K::TABLE, key_buffer, value); } #[inline] @@ -541,7 +626,7 @@ trait StorageAccess { StorageCodec::encode(&value, value_buffer).unwrap(); let value_buffer = value_buffer.split(); - self.put_cf(K::table(), key_buffer, value_buffer); + self.put_cf(K::TABLE, key_buffer, value_buffer); } #[inline] @@ -550,7 +635,7 @@ trait StorageAccess { key.serialize_to(buffer); let buffer = buffer.split(); - self.delete_cf(K::table(), buffer); + self.delete_cf(K::TABLE, buffer); } #[inline] @@ -563,7 +648,7 @@ trait StorageAccess { key.serialize_to(&mut buf); let buf = buf.split(); - match self.get(K::table(), &buf) { + match self.get(K::TABLE, &buf) { Ok(value) => { let slice = value.as_ref().map(|v| v.as_ref()); @@ -600,7 +685,7 @@ trait StorageAccess { key.serialize_to(&mut buf); let buf = buf.split(); - match self.get(K::table(), &buf) { + match self.get(K::TABLE, &buf) { Ok(value) => { let slice = value.as_ref().map(|v| v.as_ref()); f(&buf, slice) diff --git a/crates/storage-rocksdb/src/outbox_table/mod.rs b/crates/storage-rocksdb/src/outbox_table/mod.rs index 1030165f8..e36442da4 100644 --- a/crates/storage-rocksdb/src/outbox_table/mod.rs +++ b/crates/storage-rocksdb/src/outbox_table/mod.rs @@ -51,14 +51,17 @@ fn get_next_outbox_message( .partition_id(partition_id) .message_index(u64::MAX); - storage.get_first_blocking(TableScan::KeyRangeInclusive(start, end), |kv| { - if let Some((k, v)) = kv { - let t = decode_key_value(k, v)?; - Ok(Some(t)) - } else { - Ok(None) - } - }) + storage.get_first_blocking( + TableScan::KeyRangeInclusiveInSinglePartition(partition_id, start, end), + |kv| { + if let Some((k, v)) = kv { + let t = decode_key_value(k, v)?; + Ok(Some(t)) + } else { + Ok(None) + } + }, + ) } fn get_outbox_message( diff --git a/crates/storage-rocksdb/src/scan.rs b/crates/storage-rocksdb/src/scan.rs index 3e518edee..d318093b2 100644 --- a/crates/storage-rocksdb/src/scan.rs +++ b/crates/storage-rocksdb/src/scan.rs @@ -8,67 +8,91 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::keys::{KeyCodec, TableKey}; -use crate::scan::TableScan::{KeyPrefix, KeyRangeInclusive, Partition, PartitionKeyRange}; -use crate::TableKind; +use crate::keys::{KeyCodec, KeyKind, TableKey}; +use crate::scan::TableScan::{ + FullScanPartitionKeyRange, KeyRangeInclusiveInSinglePartition, SinglePartition, + SinglePartitionKeyPrefix, +}; +use crate::{ScanMode, TableKind}; use bytes::BytesMut; use restate_types::identifiers::{PartitionId, PartitionKey}; use std::ops::RangeInclusive; +// Note: we take extra arguments like (PartitionId or PartitionKey) only to make sure that +// call-sites know what they are opting to. Those values might not actually be used to perform the +// query, albeit this might change at any time. +#[derive(Debug)] pub enum TableScan { /// Scan an entire partition of a given table. - Partition(PartitionId), - /// Scan an inclusive key-range. - PartitionKeyRange(RangeInclusive), - /// Key Prefix - KeyPrefix(K), - /// Inclusive Key Range - KeyRangeInclusive(K, K), + SinglePartition(PartitionId), + /// Scan an inclusive key-range potentially across partitions. + /// Requires total seek order + FullScanPartitionKeyRange(RangeInclusive), + /// Scan within a single partition key + SinglePartitionKeyPrefix(PartitionKey, K), + /// Inclusive Key Range in a single partition. + KeyRangeInclusiveInSinglePartition(PartitionId, K, K), } pub(crate) enum PhysicalScan { - Prefix(TableKind, BytesMut), - RangeExclusive(TableKind, BytesMut, BytesMut), - RangeOpen(TableKind, BytesMut), + Prefix(TableKind, KeyKind, BytesMut), + RangeExclusive(TableKind, KeyKind, ScanMode, BytesMut, BytesMut), + // Exclusively used for cross-partition full-scan queries. + RangeOpen(TableKind, KeyKind, BytesMut), } impl From> for PhysicalScan { fn from(scan: TableScan) -> Self { match scan { - KeyPrefix(key) => PhysicalScan::Prefix(K::table(), key.serialize()), - KeyRangeInclusive(start, end) => { + SinglePartitionKeyPrefix(_partition_key, key) => { + PhysicalScan::Prefix(K::TABLE, K::KEY_KIND, key.serialize()) + } + KeyRangeInclusiveInSinglePartition(_partition_id, start, end) => { let start = start.serialize(); let mut end = end.serialize(); if try_increment(&mut end) { - PhysicalScan::RangeExclusive(K::table(), start, end) + PhysicalScan::RangeExclusive( + K::TABLE, + K::KEY_KIND, + ScanMode::WithinPrefix, + start, + end, + ) } else { - PhysicalScan::RangeOpen(K::table(), start) + // not allowed to happen since we guarantee that KeyKind is + // always incrementable. + panic!("Key range end overflowed, start key {:x?}", &start); } } - Partition(partition_id) => { - let mut kind = BytesMut::with_capacity( - partition_id.serialized_length() + K::serialized_key_kind_length(), + SinglePartition(partition_id) => { + let mut prefix_start = BytesMut::with_capacity( + partition_id.serialized_length() + KeyKind::SERIALIZED_LENGTH, ); - K::serialize_key_kind(&mut kind); - partition_id.encode(&mut kind); - PhysicalScan::Prefix(K::table(), kind) + K::serialize_key_kind(&mut prefix_start); + partition_id.encode(&mut prefix_start); + PhysicalScan::Prefix(K::TABLE, K::KEY_KIND, prefix_start) } - PartitionKeyRange(range) => { + FullScanPartitionKeyRange(range) => { let (start, end) = (range.start(), range.end()); - let mut start_bytes = BytesMut::with_capacity( - start.serialized_length() + K::serialized_key_kind_length(), - ); + let mut start_bytes = + BytesMut::with_capacity(start.serialized_length() + KeyKind::SERIALIZED_LENGTH); K::serialize_key_kind(&mut start_bytes); start.encode(&mut start_bytes); match end.checked_add(1) { - None => PhysicalScan::RangeOpen(K::table(), start_bytes), + None => PhysicalScan::RangeOpen(K::TABLE, K::KEY_KIND, start_bytes), Some(end) => { let mut end_bytes = BytesMut::with_capacity( - end.serialized_length() + K::serialized_key_kind_length(), + end.serialized_length() + KeyKind::SERIALIZED_LENGTH, ); K::serialize_key_kind(&mut end_bytes); end.encode(&mut end_bytes); - PhysicalScan::RangeExclusive(K::table(), start_bytes, end_bytes) + PhysicalScan::RangeExclusive( + K::TABLE, + K::KEY_KIND, + ScanMode::TotalOrder, + start_bytes, + end_bytes, + ) } } } @@ -78,7 +102,7 @@ impl From> for PhysicalScan { impl TableScan { pub fn table(&self) -> TableKind { - K::table() + K::TABLE } } diff --git a/crates/storage-rocksdb/src/service_status_table/mod.rs b/crates/storage-rocksdb/src/service_status_table/mod.rs index e980ccb0d..55b6c1bcc 100644 --- a/crates/storage-rocksdb/src/service_status_table/mod.rs +++ b/crates/storage-rocksdb/src/service_status_table/mod.rs @@ -10,7 +10,7 @@ use crate::keys::{define_table_key, KeyKind, TableKey}; use crate::owned_iter::OwnedIterator; -use crate::TableScan::PartitionKeyRange; +use crate::TableScan::FullScanPartitionKeyRange; use crate::{RocksDBStorage, TableKind}; use crate::{RocksDBTransaction, StorageAccess}; use bytestring::ByteString; @@ -120,7 +120,7 @@ impl RocksDBStorage { &self, range: RangeInclusive, ) -> impl Iterator + '_ { - let iter = self.iterator_from(PartitionKeyRange::(range)); + let iter = self.iterator_from(FullScanPartitionKeyRange::(range)); OwnedIterator::new(iter).map(|(mut key, mut value)| { let state_key = ServiceStatusKey::deserialize_from(&mut key).unwrap(); let state_value = StorageCodec::decode::(&mut value).unwrap(); diff --git a/crates/storage-rocksdb/src/state_table/mod.rs b/crates/storage-rocksdb/src/state_table/mod.rs index e0ff99dd9..9bbbfad62 100644 --- a/crates/storage-rocksdb/src/state_table/mod.rs +++ b/crates/storage-rocksdb/src/state_table/mod.rs @@ -79,9 +79,10 @@ fn delete_all_user_state(storage: &mut S, service_id: &Service .service_name(service_id.service_name.clone()) .service_key(service_id.key.clone()); - let keys = storage.for_each_key_value_in_place(TableScan::KeyPrefix(prefix_key), |k, _| { - TableScanIterationDecision::Emit(Ok(Bytes::copy_from_slice(k))) - }); + let keys = storage.for_each_key_value_in_place( + TableScan::SinglePartitionKeyPrefix(service_id.partition_key(), prefix_key), + |k, _| TableScanIterationDecision::Emit(Ok(Bytes::copy_from_slice(k))), + ); for k in keys { storage.delete_cf(State, &k?); @@ -108,9 +109,10 @@ fn get_all_user_states( .service_name(service_id.service_name.clone()) .service_key(service_id.key.clone()); - storage.for_each_key_value_in_place(TableScan::KeyPrefix(key), |k, v| { - TableScanIterationDecision::Emit(decode_user_state_key_value(k, v)) - }) + storage.for_each_key_value_in_place( + TableScan::SinglePartitionKeyPrefix(service_id.partition_key(), key), + |k, v| TableScanIterationDecision::Emit(decode_user_state_key_value(k, v)), + ) } impl ReadOnlyStateTable for RocksDBStorage { @@ -195,7 +197,7 @@ impl RocksDBStorage { &self, range: RangeInclusive, ) -> impl Iterator + '_ { - let iter = self.iterator_from(TableScan::PartitionKeyRange::(range)); + let iter = self.iterator_from(TableScan::FullScanPartitionKeyRange::(range)); OwnedIterator::new(iter).map(|(mut key, value)| { let row_key = StateKey::deserialize_from(&mut key).unwrap(); OwnedStateRow { diff --git a/crates/storage-rocksdb/src/timer_table/mod.rs b/crates/storage-rocksdb/src/timer_table/mod.rs index 12630cd25..5a0b3177c 100644 --- a/crates/storage-rocksdb/src/timer_table/mod.rs +++ b/crates/storage-rocksdb/src/timer_table/mod.rs @@ -81,9 +81,9 @@ fn exclusive_start_key_range( .partition_id(partition_id) .timestamp(u64::MAX); - TableScan::KeyRangeInclusive(lower_bound, upper_bound) + TableScan::KeyRangeInclusiveInSinglePartition(partition_id, lower_bound, upper_bound) } else { - TableScan::Partition(partition_id) + TableScan::SinglePartition(partition_id) } } @@ -255,7 +255,7 @@ mod tests { assert!(less_than(&key_a_bytes, &key_b_bytes)); let (low, high) = match exclusive_start_key_range(1, Some(&key_a)) { - TableScan::KeyRangeInclusive(low, high) => (low, high), + TableScan::KeyRangeInclusiveInSinglePartition(1, low, high) => (low, high), _ => panic!(""), }; let low = low.serialize();