Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default to low-latency local loglet configuration #1546

Merged
merged 2 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions crates/bifrost/src/loglets/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
// by the Apache License, Version 2.0.

use std::sync::Arc;
use std::time::Duration;

use bytes::{Bytes, BytesMut};
use futures::StreamExt as FutureStreamExt;
use metrics::histogram;
use restate_rocksdb::{IoMode, Priority, RocksDb};
use rocksdb::{BoundColumnFamily, WriteBatch};
use smallvec::SmallVec;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tokio_stream::StreamExt as TokioStreamExt;
use tracing::{debug, error, trace, warn};

use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind};
Expand Down Expand Up @@ -73,6 +75,8 @@ impl LogStoreWriter {
) -> Result<RocksDbLogWriterHandle, ShutdownError> {
// big enough to allows a second full batch to queue up while the existing one is being processed
let batch_size = std::cmp::max(1, updateable.load().writer_batch_commit_count);
// leave twice as much space in the the channel to ensure we can enqueue up-to a full batch in
// the backlog while we process this one.
let (sender, receiver) = mpsc::channel(batch_size * 2);

task_center().spawn_child(
Expand All @@ -82,9 +86,19 @@ impl LogStoreWriter {
async move {
let opts = updateable.load();
let batch_size = std::cmp::max(1, opts.writer_batch_commit_count);
let batch_duration = opts.writer_batch_commit_duration.into();
let receiver =
ReceiverStream::new(receiver).chunks_timeout(batch_size, batch_duration);
let batch_duration: Duration = opts.writer_batch_commit_duration.into();
// We don't want to use chunks_timeout if time-based batching is disabled, why?
// because even if duration is zero, tokio's timer resolution is 1ms which means
// that we will delay every batch by 1ms for no reason.
let receiver = if batch_duration == Duration::ZERO {
ReceiverStream::new(receiver)
.ready_chunks(batch_size)
.boxed()
} else {
ReceiverStream::new(receiver)
.chunks_timeout(batch_size, batch_duration)
.boxed()
};
tokio::pin!(receiver);

loop {
Expand All @@ -93,7 +107,7 @@ impl LogStoreWriter {
_ = cancellation_watcher() => {
break;
}
Some(cmds) = receiver.next() => {
Some(cmds) = TokioStreamExt::next(&mut receiver) => {
let opts = updateable.load();
self.handle_commands(opts, cmds).await;
}
Expand Down Expand Up @@ -143,6 +157,8 @@ impl LogStoreWriter {
}
}

// todo: future optimization. pre-merge all updates within a batch before writing
// the merge to rocksdb.
if let Some(logstate_updates) = command.log_state_updates {
Self::update_log_state(
&metadata_cf,
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Default for LocalLogletOptions {
batch_wal_flushes: true,
sync_wal_before_ack: true,
writer_batch_commit_count: 500,
writer_batch_commit_duration: Duration::from_nanos(5).into(),
writer_batch_commit_duration: Duration::ZERO.into(),
#[cfg(any(test, feature = "test-util"))]
data_dir: super::default_arc_tmp(),
}
Expand Down
16 changes: 3 additions & 13 deletions crates/types/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};

use crate::flexbuffers_storage_encode_decode;
use crate::identifiers::PartitionId;
use crate::time::MillisSinceEpoch;
use crate::time::NanosSinceEpoch;

pub mod metadata;

Expand Down Expand Up @@ -111,17 +111,13 @@ where

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Header {
created_at: MillisSinceEpoch,
// additional custom headers can be added here. Those should be somewhat
// generic and values must be optional.
pub custom_data_1: Option<u64>,
pub created_at: NanosSinceEpoch,
}

impl Default for Header {
fn default() -> Self {
Self {
created_at: MillisSinceEpoch::now(),
custom_data_1: None,
created_at: NanosSinceEpoch::now(),
}
}
}
Expand Down Expand Up @@ -149,12 +145,6 @@ impl Payload {
}
}

/// Sets the custom data 1 field on the record header
pub fn with_custom_data_1(mut self, value: u64) -> Self {
self.header.custom_data_1 = Some(value);
self
}

pub fn body(&self) -> &Bytes {
&self.body
}
Expand Down
60 changes: 60 additions & 0 deletions crates/types/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,58 @@ impl From<MillisSinceEpoch> for SystemTime {
}
}

/// Nanos since the unix epoch. Used internally to get rough latency measurements across nodes.
/// It's vulnerable to clock skews and sync issues, so use with care. That said, it's fairly
/// accurate when used on the same node. This roughly maps to std::time::Instant except that the
/// value is portable across nodes.
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[serde(transparent)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct NanosSinceEpoch(u64);

impl NanosSinceEpoch {
pub fn now() -> Self {
SystemTime::now().into()
}

pub fn as_u64(&self) -> u64 {
self.0
}

pub fn elapsed(&self) -> Duration {
let now = Self::now();
Duration::from_nanos(now.0 - self.0)
}
}

impl Default for NanosSinceEpoch {
fn default() -> Self {
Self::now()
}
}

impl From<u64> for NanosSinceEpoch {
fn from(value: u64) -> Self {
Self(value)
}
}

impl From<SystemTime> for NanosSinceEpoch {
fn from(value: SystemTime) -> Self {
Self(
u64::try_from(
value
.duration_since(SystemTime::UNIX_EPOCH)
.expect("duration since Unix epoch should be well-defined")
.as_nanos(),
)
.expect("nanos since Unix epoch should fit in u64"),
)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -81,4 +133,12 @@ mod tests {
let t: SystemTime = MillisSinceEpoch::new(u64::MAX).into();
println!("{:?}", t);
}

#[test]
fn nanos_should_not_overflow() {
// it's ~580 years from unix epoch until u64 wouldn't become sufficient to store nanos.
let t = NanosSinceEpoch::now().as_u64();
assert!(t < u64::MAX);
println!("{:?}", t);
}
}
Loading