From 347768ff8e1af26f1e0f3f278c86a4fbaa6a496d Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 22 May 2024 13:47:08 +0100 Subject: [PATCH] Default to low-latency local loglet configuration This PR changes the default configuration of the local loglet to disable time-based batching. Most users's first impressions and general use will expect interactive low-latency experience and we want this to be the default case. A major performance win is the change in local loglet implementation. Tokio has 1ms timer resolution, so even if batching duration is set to zero, the runtime will still sleep an arbitrary amound between 0 and 1ms to resume the stream. The change switches to chunking ready items by default if time-based batching is zero. This results in 6X lower append path latency in my tests (even with fdatasync running on fast nvme) --- .../loglets/local_loglet/log_store_writer.rs | 26 +++++++++++++++---- crates/types/src/config/bifrost.rs | 2 +- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs b/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs index da9bf8c20..b967923ff 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs +++ b/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs @@ -9,15 +9,17 @@ // by the Apache License, Version 2.0. use std::sync::Arc; +use std::time::Duration; use bytes::{Bytes, BytesMut}; +use futures::StreamExt as FutureStreamExt; use metrics::histogram; use restate_rocksdb::{IoMode, Priority, RocksDb}; use rocksdb::{BoundColumnFamily, WriteBatch}; use smallvec::SmallVec; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; -use tokio_stream::StreamExt; +use tokio_stream::StreamExt as TokioStreamExt; use tracing::{debug, error, trace, warn}; use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind}; @@ -73,6 +75,8 @@ impl LogStoreWriter { ) -> Result { // big enough to allows a second full batch to queue up while the existing one is being processed let batch_size = std::cmp::max(1, updateable.load().writer_batch_commit_count); + // leave twice as much space in the the channel to ensure we can enqueue up-to a full batch in + // the backlog while we process this one. let (sender, receiver) = mpsc::channel(batch_size * 2); task_center().spawn_child( @@ -82,9 +86,19 @@ impl LogStoreWriter { async move { let opts = updateable.load(); let batch_size = std::cmp::max(1, opts.writer_batch_commit_count); - let batch_duration = opts.writer_batch_commit_duration.into(); - let receiver = - ReceiverStream::new(receiver).chunks_timeout(batch_size, batch_duration); + let batch_duration: Duration = opts.writer_batch_commit_duration.into(); + // We don't want to use chunks_timeout if time-based batching is disabled, why? + // because even if duration is zero, tokio's timer resolution is 1ms which means + // that we will delay every batch by 1ms for no reason. + let receiver = if batch_duration == Duration::ZERO { + ReceiverStream::new(receiver) + .ready_chunks(batch_size) + .boxed() + } else { + ReceiverStream::new(receiver) + .chunks_timeout(batch_size, batch_duration) + .boxed() + }; tokio::pin!(receiver); loop { @@ -93,7 +107,7 @@ impl LogStoreWriter { _ = cancellation_watcher() => { break; } - Some(cmds) = receiver.next() => { + Some(cmds) = TokioStreamExt::next(&mut receiver) => { let opts = updateable.load(); self.handle_commands(opts, cmds).await; } @@ -143,6 +157,8 @@ impl LogStoreWriter { } } + // todo: future optimization. pre-merge all updates within a batch before writing + // the merge to rocksdb. if let Some(logstate_updates) = command.log_state_updates { Self::update_log_state( &metadata_cf, diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index fdebbf285..3cd65002f 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -105,7 +105,7 @@ impl Default for LocalLogletOptions { batch_wal_flushes: true, sync_wal_before_ack: true, writer_batch_commit_count: 500, - writer_batch_commit_duration: Duration::from_nanos(5).into(), + writer_batch_commit_duration: Duration::ZERO.into(), #[cfg(any(test, feature = "test-util"))] data_dir: super::default_arc_tmp(), }