Skip to content

Commit

Permalink
[rocksdb] Make perf level configurable
Browse files Browse the repository at this point in the history
This is now controlled by `common.rocksdb-perf-level`. Default is `enable-count`

Also adds detailed rocksdb instrumentation

***
  • Loading branch information
AhmedSoliman committed Jun 4, 2024
1 parent 4b2743d commit 5e5befa
Show file tree
Hide file tree
Showing 19 changed files with 256 additions and 44 deletions.
8 changes: 7 additions & 1 deletion crates/bifrost/src/loglets/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,13 @@ impl LogStoreWriter {
);
let result = self
.rocksdb
.write_batch(Priority::High, IoMode::default(), write_opts, write_batch)
.write_batch(
"local-loglet-write-batch",
Priority::High,
IoMode::default(),
write_opts,
write_batch,
)
.await;

if let Err(e) = result {
Expand Down
12 changes: 10 additions & 2 deletions crates/bifrost/src/loglets/local_loglet/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
use std::pin::pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::{ready, Poll};
use std::task::Poll;

use bytes::{BufMut, Bytes, BytesMut};
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use restate_core::ShutdownError;
use restate_rocksdb::RocksDbPerfGuard;
use restate_types::logs::SequenceNumber;
use rocksdb::{DBRawIteratorWithThreadMode, DB};
use tokio_stream::wrappers::WatchStream;
Expand Down Expand Up @@ -118,13 +119,20 @@ impl Stream for LocalLogletReadStream {

let next_offset = self.read_pointer.next();

let perf_guard = RocksDbPerfGuard::new("local-loglet-next");
loop {
let mut this = self.as_mut().project();

// Are we reading after commit offset?
// We are at tail. We need to wait until new records have been released.
if next_offset > *this.release_pointer {
let updated_release_pointer = ready!(this.release_watch.poll_next(cx));
let updated_release_pointer = match this.release_watch.poll_next(cx) {
Poll::Ready(t) => t,
Poll::Pending => {
perf_guard.forget();
return Poll::Pending;
}
};

match updated_release_pointer {
Some(updated_release_pointer) => {
Expand Down
8 changes: 7 additions & 1 deletion crates/metadata-store/src/local/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,13 @@ impl LocalMetadataStore {
wb.put_cf(&cf_handle, key, self.buffer.as_ref());
Ok(self
.rocksdb
.write_batch(Priority::High, IoMode::default(), write_options, wb)
.write_batch(
"local-metadata-write-batch",
Priority::High,
IoMode::default(),
write_options,
wb,
)
.await?)
}

Expand Down
2 changes: 2 additions & 0 deletions crates/partition-store/src/deduplication_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
};
use futures::Stream;
use futures_util::stream;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::deduplication_table::{
DedupInformation, DedupSequenceNumber, DeduplicationTable, ProducerId,
ReadOnlyDeduplicationTable,
Expand All @@ -35,6 +36,7 @@ fn get_dedup_sequence_number<S: StorageAccess>(
partition_id: PartitionId,
producer_id: &ProducerId,
) -> Result<Option<DedupSequenceNumber>> {
let _x = RocksDbPerfGuard::new("get-dedup-seq");
let key = DeduplicationKey::default()
.partition_id(partition_id)
.producer_id(producer_id.clone());
Expand Down
3 changes: 3 additions & 0 deletions crates/partition-store/src/inbox_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{TableScan, TableScanIterationDecision};
use bytestring::ByteString;
use futures::Stream;
use futures_util::stream;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::inbox_table::{InboxEntry, InboxTable, SequenceNumberInboxEntry};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::{PartitionKey, ServiceId, WithPartitionKey};
Expand Down Expand Up @@ -86,6 +87,8 @@ impl<'a> InboxTable for RocksDBTransaction<'a> {
&mut self,
service_id: &ServiceId,
) -> Result<Option<SequenceNumberInboxEntry>> {
// safe since delete_inbox_entry is not really async.
let _x = RocksDbPerfGuard::new("pop-inbox");
let result = self.peek_inbox(service_id).await;

if let Ok(Some(inbox_entry)) = &result {
Expand Down
3 changes: 3 additions & 0 deletions crates/partition-store/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{PartitionStore, TableKind, TableScanIterationDecision};
use crate::{RocksDBTransaction, StorageAccess};
use futures::Stream;
use futures_util::stream;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::invocation_status_table::{
InvocationStatus, InvocationStatusTable, ReadOnlyInvocationStatusTable,
};
Expand Down Expand Up @@ -70,6 +71,7 @@ fn get_invocation_status<S: StorageAccess>(
storage: &mut S,
invocation_id: &InvocationId,
) -> Result<InvocationStatus> {
let _x = RocksDbPerfGuard::new("get-invocation-status");
let key = write_invocation_status_key(invocation_id);

storage
Expand All @@ -86,6 +88,7 @@ fn invoked_invocations<S: StorageAccess>(
storage: &mut S,
partition_key_range: RangeInclusive<PartitionKey>,
) -> Vec<Result<(InvocationId, InvocationTarget)>> {
let _x = RocksDbPerfGuard::new("invoked-invocations");
storage.for_each_key_value_in_place(
FullScanPartitionKeyRange::<InvocationStatusKey>(partition_key_range),
|mut k, mut v| {
Expand Down
5 changes: 5 additions & 0 deletions crates/partition-store/src/journal_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{PartitionStore, RocksDBTransaction, StorageAccess};
use crate::{TableScan, TableScanIterationDecision};
use futures::Stream;
use futures_util::stream;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::journal_table::{JournalEntry, JournalTable, ReadOnlyJournalTable};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::{
Expand Down Expand Up @@ -69,6 +70,7 @@ fn get_journal<S: StorageAccess>(
invocation_id: &InvocationId,
journal_length: EntryIndex,
) -> Vec<Result<(EntryIndex, JournalEntry)>> {
let _x = RocksDbPerfGuard::new("get-journal");
let key = JournalKey::default()
.partition_key(invocation_id.partition_key())
.invocation_uuid(invocation_id.invocation_uuid());
Expand Down Expand Up @@ -116,6 +118,7 @@ impl ReadOnlyJournalTable for PartitionStore {
invocation_id: &InvocationId,
journal_index: u32,
) -> Result<Option<JournalEntry>> {
let _x = RocksDbPerfGuard::new("get-journal-entry");
get_journal_entry(self, invocation_id, journal_index)
}

Expand All @@ -134,6 +137,7 @@ impl<'a> ReadOnlyJournalTable for RocksDBTransaction<'a> {
invocation_id: &InvocationId,
journal_index: u32,
) -> Result<Option<JournalEntry>> {
let _x = RocksDbPerfGuard::new("get-journal-entry");
get_journal_entry(self, invocation_id, journal_index)
}

Expand All @@ -157,6 +161,7 @@ impl<'a> JournalTable for RocksDBTransaction<'a> {
}

async fn delete_journal(&mut self, invocation_id: &InvocationId, journal_length: EntryIndex) {
let _x = RocksDbPerfGuard::new("delete-journal");
delete_journal(self, invocation_id, journal_length)
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/partition-store/src/outbox_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::keys::{define_table_key, KeyKind, TableKey};
use crate::TableKind::Outbox;
use crate::{PartitionStore, RocksDBTransaction, StorageAccess, TableScan};

use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::PartitionId;
Expand Down Expand Up @@ -43,6 +44,7 @@ fn get_next_outbox_message<S: StorageAccess>(
partition_id: PartitionId,
next_sequence_number: u64,
) -> Result<Option<(u64, OutboxMessage)>> {
let _x = RocksDbPerfGuard::new("get-next-outbox");
let start = OutboxKey::default()
.partition_id(partition_id)
.message_index(next_sequence_number);
Expand All @@ -69,6 +71,7 @@ fn get_outbox_message<S: StorageAccess>(
partition_id: PartitionId,
sequence_number: u64,
) -> Result<Option<OutboxMessage>> {
let _x = RocksDbPerfGuard::new("get-outbox");
let outbox_key = OutboxKey::default()
.partition_id(partition_id)
.message_index(sequence_number);
Expand Down
2 changes: 0 additions & 2 deletions crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,6 @@ impl<'a> RocksDBTransaction<'a> {
let mut opts = ReadOptions::default();
opts.set_iterate_range(PrefixRange(prefix.clone()));
opts.set_prefix_same_as_start(true);
//opts.set_async_io(true);
opts.set_total_order_seek(false);

let mut it = self.txn.raw_iterator_cf_opt(table, opts);
Expand All @@ -484,7 +483,6 @@ impl<'a> RocksDBTransaction<'a> {
// binding.
opts.set_total_order_seek(scan_mode == ScanMode::TotalOrder);
opts.set_iterate_range(from.clone()..to);
//opts.set_async_io(true);
let mut it = self.txn.raw_iterator_cf_opt(table, opts);
it.seek(from);
it
Expand Down
2 changes: 2 additions & 0 deletions crates/partition-store/src/promise_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use bytes::Bytes;
use bytestring::ByteString;
use futures::Stream;
use futures_util::stream;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::promise_table::{
OwnedPromiseRow, Promise, PromiseTable, ReadOnlyPromiseTable,
};
Expand Down Expand Up @@ -50,6 +51,7 @@ fn get_promise<S: StorageAccess>(
service_id: &ServiceId,
key: &ByteString,
) -> Result<Option<Promise>> {
let _x = RocksDbPerfGuard::new("get-promise");
storage.get_value(create_key(service_id, key))
}

Expand Down
2 changes: 2 additions & 0 deletions crates/partition-store/src/service_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::TableScan::FullScanPartitionKeyRange;
use crate::{PartitionStore, TableKind};
use crate::{RocksDBTransaction, StorageAccess};
use bytestring::ByteString;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::service_status_table::{
ReadOnlyVirtualObjectStatusTable, VirtualObjectStatus, VirtualObjectStatusTable,
};
Expand Down Expand Up @@ -60,6 +61,7 @@ fn get_virtual_object_status<S: StorageAccess>(
storage: &mut S,
service_id: &ServiceId,
) -> Result<VirtualObjectStatus> {
let _x = RocksDbPerfGuard::new("get-virtual-obj-status");
let key = ServiceStatusKey::default()
.partition_key(service_id.partition_key())
.service_name(service_id.service_name.clone())
Expand Down
3 changes: 3 additions & 0 deletions crates/partition-store/src/state_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use bytes::Bytes;
use bytestring::ByteString;
use futures::Stream;
use futures_util::stream;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::state_table::{ReadOnlyStateTable, StateTable};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::{PartitionKey, ServiceId, WithPartitionKey};
Expand Down Expand Up @@ -96,6 +97,7 @@ fn get_user_state<S: StorageAccess>(
service_id: &ServiceId,
state_key: impl AsRef<[u8]>,
) -> Result<Option<Bytes>> {
let _x = RocksDbPerfGuard::new("get-user-state");
let key = write_state_entry_key(service_id, state_key);
storage.get_kv_raw(key, move |_k, v| Ok(v.map(Bytes::copy_from_slice)))
}
Expand All @@ -104,6 +106,7 @@ fn get_all_user_states<S: StorageAccess>(
storage: &mut S,
service_id: &ServiceId,
) -> Vec<Result<(Bytes, Bytes)>> {
let _x = RocksDbPerfGuard::new("get-all-user-state");
let key = StateKey::default()
.partition_key(service_id.partition_key())
.service_name(service_id.service_name.clone())
Expand Down
2 changes: 2 additions & 0 deletions crates/partition-store/src/timer_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{PartitionStore, RocksDBTransaction, StorageAccess};
use crate::{TableScan, TableScanIterationDecision};
use futures::Stream;
use futures_util::stream;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind, TimerTable};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::{InvocationUuid, PartitionId};
Expand Down Expand Up @@ -144,6 +145,7 @@ fn next_timers_greater_than<S: StorageAccess>(
exclusive_start: Option<&TimerKey>,
limit: usize,
) -> Vec<Result<(TimerKey, Timer)>> {
let _x = RocksDbPerfGuard::new("get-next-timers");
let scan = exclusive_start_key_range(partition_id, exclusive_start);
let mut produced = 0;
storage.for_each_key_value_in_place(scan, move |k, v| {
Expand Down
Loading

0 comments on commit 5e5befa

Please sign in to comment.