Skip to content

Commit

Permalink
[RocksDb] Tables consolidation
Browse files Browse the repository at this point in the history
This is one step in a series of changes to move to a one CF per partition-id. The change consolidates all partitions into a single CF `data-unpartitioned`.

Notes:
- This introduces a fixed key prefix to partition CF to improve bloom filter lookups. This change require some subtle handling of scans that spans multiple prefixes, this is handled and is covered by existing test cases.
- This adds the ability for a table to have multiple KeyKinds attached. Each TableKey has an associated kind, and the table (TableKind) has a set of KeyKinds associated.
- This is designed to allow future separation of tables, or even individual key kinds to separate CFs.
- Some values are intentionally passed but not used for (a) clarity at call site and (b) to help with follow-up refactoring.


Side note: cluster marker tests fail sporadically, this adds a flush after each temp file write in an attempt to fix them.
  • Loading branch information
AhmedSoliman committed Apr 29, 2024
1 parent 5a826ce commit f81ec58
Show file tree
Hide file tree
Showing 15 changed files with 365 additions and 230 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 21 additions & 12 deletions crates/node/src/cluster_marker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ mod tests {
let mut file = OpenOptions::new().create(true).write(true).open(path)?;
serde_json::to_writer(&file, cluster_marker)?;
file.flush()?;
file.sync_all()?;

Ok(())
}
Expand Down Expand Up @@ -253,8 +254,8 @@ mod tests {
}

#[test]
fn cluster_marker_is_updated() {
let file = NamedTempFile::new().unwrap();
fn cluster_marker_is_updated() -> anyhow::Result<()> {
let mut file = NamedTempFile::new().unwrap();
let previous_version = Version::new(1, 1, 6);
let current_version = Version::new(1, 2, 3);

Expand All @@ -263,6 +264,7 @@ mod tests {
file.path(),
)
.unwrap();
file.flush()?;

validate_and_update_cluster_marker_inner(
CLUSTER_NAME,
Expand All @@ -281,12 +283,13 @@ mod tests {
max_version: current_version,
cluster_name: CLUSTER_NAME.to_owned(),
}
)
);
Ok(())
}

#[test]
fn max_version_is_maintained() {
let file = NamedTempFile::new().unwrap();
fn max_version_is_maintained() -> anyhow::Result<()> {
let mut file = NamedTempFile::new().unwrap();
let max_version = Version::new(1, 2, 6);
let current_version = Version::new(1, 1, 3);

Expand All @@ -295,6 +298,7 @@ mod tests {
file.path(),
)
.unwrap();
file.flush()?;

validate_and_update_cluster_marker_inner(
CLUSTER_NAME,
Expand All @@ -313,12 +317,13 @@ mod tests {
max_version,
cluster_name: CLUSTER_NAME.to_owned(),
}
)
);
Ok(())
}

#[test]
fn incompatible_cluster_name() {
let file = NamedTempFile::new().unwrap();
fn incompatible_cluster_name() -> anyhow::Result<()> {
let mut file = NamedTempFile::new().unwrap();
let max_version = Version::new(1, 2, 6);
let current_version = Version::new(1, 1, 3);

Expand All @@ -327,6 +332,7 @@ mod tests {
file.path(),
)
.unwrap();
file.flush()?;

let result = validate_and_update_cluster_marker_inner(
CLUSTER_NAME,
Expand All @@ -337,12 +343,13 @@ mod tests {
assert!(matches!(
result,
Err(ClusterValidationError::IncorrectClusterName { .. })
))
));
Ok(())
}

#[test]
fn incompatible_version() {
let file = NamedTempFile::new().unwrap();
fn incompatible_version() -> anyhow::Result<()> {
let mut file = NamedTempFile::new().unwrap();
let max_version = Version::new(1, 2, 6);
let compatibility_boundary = Version::new(1, 1, 1);
let current_version = Version::new(1, 0, 3);
Expand All @@ -352,6 +359,7 @@ mod tests {
file.path(),
)
.unwrap();
file.flush()?;

let mut compatibility_map = COMPATIBILITY_MAP.deref().clone();
compatibility_map.insert(
Expand All @@ -368,6 +376,7 @@ mod tests {
assert!(matches!(
result,
Err(ClusterValidationError::IncompatibleVersion { .. })
))
));
Ok(())
}
}
4 changes: 2 additions & 2 deletions crates/storage-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ derive_more = { workspace = true }
drain = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
once_cell = "1.18.0"
paste = { workspace = true }
prost = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
sync_wrapper = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
once_cell = "1.18.0"
log = "0.4.20"

[dev-dependencies]
restate-core = { workspace = true, features = ["test-util"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/storage-rocksdb/src/deduplication_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn get_all_sequence_numbers<S: StorageAccess>(
partition_id: PartitionId,
) -> impl Stream<Item = Result<DedupInformation>> + Send {
stream::iter(storage.for_each_key_value_in_place(
TableScan::Partition::<DeduplicationKey>(partition_id),
TableScan::SinglePartition::<DeduplicationKey>(partition_id),
move |k, mut v| {
let key =
DeduplicationKey::deserialize_from(&mut Cursor::new(k)).map(|key| key.producer_id);
Expand Down
4 changes: 3 additions & 1 deletion crates/storage-rocksdb/src/idempotency_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ fn all_idempotency_metadata<S: StorageAccess>(
storage: &mut S,
range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<(IdempotencyId, IdempotencyMetadata)>> + Send + '_ {
let iter = storage.iterator_from(TableScan::PartitionKeyRange::<IdempotencyKey>(range));
let iter = storage.iterator_from(TableScan::FullScanPartitionKeyRange::<IdempotencyKey>(
range,
));
stream::iter(OwnedIterator::new(iter).map(|(mut k, mut v)| {
let key = IdempotencyKey::deserialize_from(&mut k)?;
let idempotency_metadata = StorageCodec::decode::<IdempotencyMetadata, _>(&mut v)
Expand Down
28 changes: 16 additions & 12 deletions crates/storage-rocksdb/src/inbox_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,16 @@ impl<'a> InboxTable for RocksDBTransaction<'a> {
.service_name(service_id.service_name.clone())
.service_key(service_id.key.clone());

self.get_first_blocking(TableScan::KeyPrefix(key), |kv| match kv {
Some((k, v)) => {
let entry = decode_inbox_key_value(k, v)?;
Ok(Some(entry))
}
None => Ok(None),
})
self.get_first_blocking(
TableScan::SinglePartitionKeyPrefix(service_id.partition_key(), key),
|kv| match kv {
Some((k, v)) => {
let entry = decode_inbox_key_value(k, v)?;
Ok(Some(entry))
}
None => Ok(None),
},
)
}

async fn pop_inbox(
Expand All @@ -102,20 +105,21 @@ impl<'a> InboxTable for RocksDBTransaction<'a> {
.service_name(service_id.service_name.clone())
.service_key(service_id.key.clone());

stream::iter(
self.for_each_key_value_in_place(TableScan::KeyPrefix(key), |k, v| {
stream::iter(self.for_each_key_value_in_place(
TableScan::SinglePartitionKeyPrefix(service_id.partition_key(), key),
|k, v| {
let inbox_entry = decode_inbox_key_value(k, v);
TableScanIterationDecision::Emit(inbox_entry)
}),
)
},
))
}

fn all_inboxes(
&mut self,
range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<SequenceNumberInboxEntry>> + Send {
stream::iter(self.for_each_key_value_in_place(
TableScan::PartitionKeyRange::<InboxKey>(range),
TableScan::FullScanPartitionKeyRange::<InboxKey>(range),
|k, v| {
let inbox_entry = decode_inbox_key_value(k, v);
TableScanIterationDecision::Emit(inbox_entry)
Expand Down
6 changes: 3 additions & 3 deletions crates/storage-rocksdb/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use crate::keys::{define_table_key, KeyKind, TableKey};
use crate::owned_iter::OwnedIterator;
use crate::TableScan::PartitionKeyRange;
use crate::TableScan::FullScanPartitionKeyRange;
use crate::{RocksDBStorage, TableKind, TableScanIterationDecision};
use crate::{RocksDBTransaction, StorageAccess};
use futures::Stream;
Expand Down Expand Up @@ -87,7 +87,7 @@ fn invoked_invocations<S: StorageAccess>(
partition_key_range: RangeInclusive<PartitionKey>,
) -> Vec<Result<(InvocationId, InvocationTarget)>> {
storage.for_each_key_value_in_place(
PartitionKeyRange::<InvocationStatusKey>(partition_key_range),
FullScanPartitionKeyRange::<InvocationStatusKey>(partition_key_range),
|mut k, mut v| {
let result = read_invoked_full_invocation_id(&mut k, &mut v).transpose();
if let Some(res) = result {
Expand Down Expand Up @@ -172,7 +172,7 @@ impl RocksDBStorage {
&self,
range: RangeInclusive<PartitionKey>,
) -> impl Iterator<Item = OwnedInvocationStatusRow> + '_ {
let iter = self.iterator_from(PartitionKeyRange::<InvocationStatusKey>(range));
let iter = self.iterator_from(FullScanPartitionKeyRange::<InvocationStatusKey>(range));
OwnedIterator::new(iter).map(|(mut key, mut value)| {
let state_key = InvocationStatusKey::deserialize_from(&mut key).unwrap();
let state_value = StorageCodec::decode::<InvocationStatus, _>(&mut value).unwrap();
Expand Down
43 changes: 23 additions & 20 deletions crates/storage-rocksdb/src/journal_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use crate::keys::TableKey;
use crate::keys::{define_table_key, KeyKind};
use crate::owned_iter::OwnedIterator;
use crate::scan::TableScan::PartitionKeyRange;
use crate::scan::TableScan::FullScanPartitionKeyRange;
use crate::TableKind::Journal;
use crate::{RocksDBStorage, RocksDBTransaction, StorageAccess};
use crate::{TableScan, TableScanIterationDecision};
Expand Down Expand Up @@ -74,24 +74,27 @@ fn get_journal<S: StorageAccess>(
.invocation_uuid(invocation_id.invocation_uuid());

let mut n = 0;
storage.for_each_key_value_in_place(TableScan::KeyPrefix(key), move |k, mut v| {
let key = JournalKey::deserialize_from(&mut Cursor::new(k)).map(|journal_key| {
journal_key
.journal_index
.expect("The journal index must be part of the journal key.")
});
let entry = StorageCodec::decode::<JournalEntry, _>(&mut v)
.map_err(|error| StorageError::Generic(error.into()));

let result = key.and_then(|key| entry.map(|entry| (key, entry)));

n += 1;
if n < journal_length {
TableScanIterationDecision::Emit(result)
} else {
TableScanIterationDecision::BreakWith(result)
}
})
storage.for_each_key_value_in_place(
TableScan::SinglePartitionKeyPrefix(invocation_id.partition_key(), key),
move |k, mut v| {
let key = JournalKey::deserialize_from(&mut Cursor::new(k)).map(|journal_key| {
journal_key
.journal_index
.expect("The journal index must be part of the journal key.")
});
let entry = StorageCodec::decode::<JournalEntry, _>(&mut v)
.map_err(|error| StorageError::Generic(error.into()));

let result = key.and_then(|key| entry.map(|entry| (key, entry)));

n += 1;
if n < journal_length {
TableScanIterationDecision::Emit(result)
} else {
TableScanIterationDecision::BreakWith(result)
}
},
)
}

fn delete_journal<S: StorageAccess>(
Expand Down Expand Up @@ -170,7 +173,7 @@ impl RocksDBStorage {
&self,
range: RangeInclusive<PartitionKey>,
) -> impl Iterator<Item = OwnedJournalRow> + '_ {
let iter = self.iterator_from(PartitionKeyRange::<JournalKey>(range));
let iter = self.iterator_from(FullScanPartitionKeyRange::<JournalKey>(range));
OwnedIterator::new(iter).map(|(mut key, mut value)| {
let journal_key = JournalKey::deserialize_from(&mut key)
.expect("journal key must deserialize into JournalKey");
Expand Down
Loading

0 comments on commit f81ec58

Please sign in to comment.