Skip to content

Commit

Permalink
[Bifrost] Support append_batch
Browse files Browse the repository at this point in the history
This introduces a new append_batch API to bifrost that accepts a batch of payloads. In addition, we make use of this API to handle batches of action effects when consuming the effect stream. We attempt to batch up to 10 items (hardcoded at the moment) before sending them to bifrost.

Impact: For small invocations, we now average ~4 records (+1 for metadata) per batch since invoker emits input/get_state/output etc. in bursts. This reduces the overhead of handling the happy path of small handlers.

This also address a small issue where the the initial offsets leave an unnecessary gap on re-instantiation.
  • Loading branch information
AhmedSoliman committed May 22, 2024
1 parent e8b09c3 commit e5b89d4
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 80 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ static_assertions = { version = "1.1.0" }
strum = { version = "0.26.1" }
strum_macros = { version = "0.26.1" }
sync_wrapper = "0.1.2"
smallvec = { version = "1.13.2", features = ["serde"] }
tempfile = "3.6.0"
test-log = { version = "0.2.11", default-features = false, features = ["trace"] }
# tikv-jemallocator has not yet been released with musl target support, so we pin a main commit
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ codederror = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
enum-map = { workspace = true, features = ["serde"] }
futures = { workspace = true }
humantime = { workspace = true }
metrics = { workspace = true }
once_cell = { workspace = true }
Expand All @@ -32,7 +33,7 @@ schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
smallvec = { version = "1.13.2", features = ["serde"] }
smallvec = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
Expand All @@ -48,7 +49,6 @@ restate-metadata-store = { workspace = true }
restate-test-util = { workspace = true }

criterion = { workspace = true, features = ["async_tokio"] }
futures = { workspace = true }
googletest = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
Expand Down
29 changes: 28 additions & 1 deletion crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use bytes::BytesMut;
use enum_map::EnumMap;
use once_cell::sync::OnceCell;
use smallvec::SmallVec;
use tracing::{error, instrument};

use restate_core::{metadata, Metadata, MetadataKind};
Expand All @@ -28,7 +29,7 @@ use restate_types::Version;

use crate::loglet::{LogletBase, LogletProvider, LogletWrapper};
use crate::watchdog::{WatchdogCommand, WatchdogSender};
use crate::{Error, FindTailAttributes, LogReadStream, LogRecord};
use crate::{Error, FindTailAttributes, LogReadStream, LogRecord, SMALL_BATCH_THRESHOLD_COUNT};

/// Bifrost is Restate's durable interconnect system
///
Expand Down Expand Up @@ -68,6 +69,18 @@ impl Bifrost {
self.inner.append(log_id, payload).await
}

/// Appends a batch of records to a log. The log id must exist, otherwise the
/// operation fails with [`Error::UnknownLogId`]. The returned Lsn is the Lsn of the first
/// record in this batch. This will only return after all records have been stored.
#[instrument(level = "debug", skip(self, payloads), err)]
pub async fn append_batch(
&mut self,
log_id: LogId,
payloads: &[Payload],
) -> Result<Lsn, Error> {
self.inner.append_batch(log_id, payloads).await
}

/// Read the next record after the LSN provided. The `start` indicates the LSN where we will
/// read after. This means that the record returned will have a LSN strictly greater than
/// `after`. If no records are committed yet after this LSN, this read operation will "wait"
Expand Down Expand Up @@ -165,6 +178,20 @@ impl BifrostInner {
loglet.append(buf.freeze()).await
}

pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn, Error> {
let loglet = self.writeable_loglet(log_id).await?;
let raw_payloads: SmallVec<[_; SMALL_BATCH_THRESHOLD_COUNT]> = payloads
.iter()
.map(|payload| {
let mut buf = BytesMut::new();
StorageCodec::encode(payload, &mut buf)
.expect("serialization to bifrost is infallible");
buf.freeze()
})
.collect();
loglet.append_batch(&raw_payloads).await
}

pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord, Error> {
self.fail_if_shutting_down()?;

Expand Down
2 changes: 2 additions & 0 deletions crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ pub use read_stream::LogReadStream;
pub use record::*;
pub use service::BifrostService;
pub use types::*;

pub const SMALL_BATCH_THRESHOLD_COUNT: usize = 4;
18 changes: 18 additions & 0 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::ops::Add;
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -44,6 +45,14 @@ pub fn create_provider(kind: ProviderKind) -> Result<Arc<dyn LogletProvider>, Pr
)]
pub struct LogletOffset(pub(crate) u64);

impl Add<usize> for LogletOffset {
type Output = Self;
fn add(self, rhs: usize) -> Self {
// we always assume that we are running on a 64bit cpu arch.
Self(self.0.saturating_add(rhs as u64))
}
}

impl SequenceNumber for LogletOffset {
const MAX: Self = LogletOffset(u64::MAX);
const INVALID: Self = LogletOffset(0);
Expand Down Expand Up @@ -107,6 +116,10 @@ pub trait LogletBase: Send + Sync {
/// Append a record to the loglet.
async fn append(&self, data: Bytes) -> Result<Self::Offset, Error>;

/// Append a batch of records to the loglet. The returned offset (on success) if the offset of
/// the first record in the batch)
async fn append_batch(&self, payloads: &[Bytes]) -> Result<Self::Offset, Error>;

/// Find the tail of the loglet. If the loglet is empty or have been trimmed, the loglet should
/// return `None`.
async fn find_tail(&self) -> Result<Option<Self::Offset>, Error>;
Expand Down Expand Up @@ -139,6 +152,11 @@ impl LogletBase for LogletWrapper {
Ok(self.base_lsn.offset_by(offset))
}

async fn append_batch(&self, payloads: &[Bytes]) -> Result<Lsn, Error> {
let offset = self.loglet.append_batch(payloads).await?;
Ok(self.base_lsn.offset_by(offset))
}

async fn find_tail(&self) -> Result<Option<Lsn>, Error> {
let offset = self.loglet.find_tail().await?;
Ok(offset.map(|o| self.base_lsn.offset_by(o)))
Expand Down
48 changes: 35 additions & 13 deletions crates/bifrost/src/loglets/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,36 @@
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use metrics::histogram;
use restate_rocksdb::{IoMode, Priority, RocksDb};
use restate_types::arc_util::Updateable;
use restate_types::config::LocalLogletOptions;
use rocksdb::{BoundColumnFamily, WriteBatch};
use smallvec::SmallVec;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::{debug, error, trace, warn};

use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind};
use restate_types::arc_util::Updateable;
use restate_types::config::LocalLogletOptions;
use restate_types::logs::SequenceNumber;

use crate::loglet::LogletOffset;
use crate::Error;
use crate::{Error, SMALL_BATCH_THRESHOLD_COUNT};

use super::keys::{MetadataKey, MetadataKind, RecordKey};
use super::log_state::LogStateUpdates;
use super::log_store::{DATA_CF, METADATA_CF};
use super::metric_definitions::{
BIFROST_LOCAL_WRITE_BATCH_COUNT, BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES,
};

type Ack = oneshot::Sender<Result<(), Error>>;
type AckRecv = oneshot::Receiver<Result<(), Error>>;

pub struct LogStoreWriteCommand {
log_id: u64,
data_update: Option<DataUpdate>,
data_updates: SmallVec<[DataUpdate; SMALL_BATCH_THRESHOLD_COUNT]>,
log_state_updates: Option<LogStateUpdates>,
ack: Option<Ack>,
}
Expand Down Expand Up @@ -125,7 +131,7 @@ impl LogStoreWriter {
.expect("metadata cf exists");

for command in commands {
if let Some(data_command) = command.data_update {
for data_command in command.data_updates {
match data_command {
DataUpdate::PutRecord { offset, data } => Self::put_record(
&data_cf,
Expand Down Expand Up @@ -153,6 +159,8 @@ impl LogStoreWriter {
}
}

histogram!(BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES).record(write_batch.size_in_bytes() as f64);
histogram!(BIFROST_LOCAL_WRITE_BATCH_COUNT).record(write_batch.len() as f64);
self.commit(opts, write_batch).await;
}

Expand Down Expand Up @@ -241,20 +249,34 @@ impl RocksDbLogWriterHandle {
log_id: u64,
offset: LogletOffset,
data: Bytes,
release_immediately: bool,
) -> Result<AckRecv, ShutdownError> {
self.enqueue_put_records(log_id, offset, &[data]).await
}

pub async fn enqueue_put_records(
&self,
log_id: u64,
mut start_offset: LogletOffset,
records: &[Bytes],
) -> Result<AckRecv, ShutdownError> {
let (ack, receiver) = oneshot::channel();
let data_update = Some(DataUpdate::PutRecord { offset, data });
let log_state_updates = if release_immediately {
Some(LogStateUpdates::default().update_release_pointer(offset))
} else {
None
};
let mut data_updates = SmallVec::with_capacity(records.len());
for record in records {
data_updates.push(DataUpdate::PutRecord {
offset: start_offset,
data: record.clone(),
});
start_offset = start_offset.next();
}

let data_updates = data_updates;
let log_state_updates =
Some(LogStateUpdates::default().update_release_pointer(start_offset.prev()));
if let Err(e) = self
.sender
.send(LogStoreWriteCommand {
log_id,
data_update,
data_updates,
log_state_updates,
ack: Some(ack),
})
Expand Down
19 changes: 19 additions & 0 deletions crates/bifrost/src/loglets/local_loglet/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,31 @@ pub(crate) const BIFROST_LOCAL_APPEND: &str = "restate.bifrost.localloglet.appen
pub(crate) const BIFROST_LOCAL_APPEND_DURATION: &str =
"restate.bifrost.localloglet.append_duration.seconds";

pub(crate) const BIFROST_LOCAL_WRITE_BATCH_COUNT: &str =
"restate.bifrost.localloglet.write_batch_count";

pub(crate) const BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES: &str =
"restate.bifrost.localloglet.write_batch_size_bytes";

pub(crate) fn describe_metrics() {
describe_counter!(
BIFROST_LOCAL_APPEND,
Unit::Count,
"Number of append requests to bifrost's local loglet"
);

describe_histogram!(
BIFROST_LOCAL_WRITE_BATCH_COUNT,
Unit::Count,
"Histogram of the number of records in each append request to local loglet"
);

describe_histogram!(
BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES,
Unit::Bytes,
"Histogram of size in bytes of local loglet write batches"
);

describe_histogram!(
BIFROST_LOCAL_APPEND_DURATION,
Unit::Seconds,
Expand Down
46 changes: 38 additions & 8 deletions crates/bifrost/src/loglets/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,13 @@ impl LogletBase for LocalLoglet {
let (receiver, offset) = {
let mut next_offset_guard = self.next_write_offset.lock().await;
// lock acquired
let offset = next_offset_guard.next();
let offset = *next_offset_guard;
let receiver = self
.log_writer
.enqueue_put_record(
self.log_id,
offset,
payload,
true, /* release_immediately */
)
.enqueue_put_record(self.log_id, offset, payload)
.await?;
*next_offset_guard = offset;
// next offset points to the next available slot.
*next_offset_guard = offset.next();
(receiver, offset)
// lock dropped
};
Expand All @@ -186,6 +182,40 @@ impl LogletBase for LocalLoglet {
Ok(offset)
}

async fn append_batch(&self, payloads: &[Bytes]) -> Result<LogletOffset, Error> {
let num_payloads = payloads.len();
counter!(BIFROST_LOCAL_APPEND).increment(num_payloads as u64);
let start_time = std::time::Instant::now();
// We hold the lock to ensure that offsets are enqueued in the order of
// their offsets in the logstore writer. This means that acknowledgements
// that an offset N from the writer imply that all previous offsets have
// been durably committed, therefore, such offsets can be released to readers.
let (receiver, offset) = {
let mut next_offset_guard = self.next_write_offset.lock().await;
let offset = *next_offset_guard;
// lock acquired
let receiver = self
.log_writer
.enqueue_put_records(self.log_id, *next_offset_guard, payloads)
.await?;
// next offset points to the next available slot.
*next_offset_guard = offset + num_payloads;
(receiver, next_offset_guard.prev())
// lock dropped
};

let _ = receiver.await.unwrap_or_else(|_| {
warn!("Unsure if the local loglet record was written, the ack channel was dropped");
Err(Error::Shutdown(ShutdownError))
})?;

self.last_committed_offset
.fetch_max(offset.into(), Ordering::Relaxed);
self.notify_readers();
histogram!(BIFROST_LOCAL_APPEND_DURATION).record(start_time.elapsed());
Ok(offset)
}

async fn find_tail(&self) -> Result<Option<LogletOffset>, Error> {
let last_committed = LogletOffset::from(self.last_committed_offset.load(Ordering::Relaxed));
if last_committed == LogletOffset::INVALID {
Expand Down
17 changes: 17 additions & 0 deletions crates/bifrost/src/loglets/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,23 @@ impl LogletBase for MemoryLoglet {
Ok(offset)
}

async fn append_batch(&self, payloads: &[Bytes]) -> Result<LogletOffset, Error> {
let mut log = self.log.lock().unwrap();
let offset = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)).next();
let first_offset = offset;
let num_payloads = payloads.len();
for payload in payloads {
debug!(
"Appending record to in-memory loglet {:?} at offset {}",
self.params, offset,
);
log.push(payload.clone());
}
// mark as committed immediately.
self.advance_commit_offset(first_offset + num_payloads);
Ok(first_offset)
}

async fn find_tail(&self) -> Result<Option<LogletOffset>, Error> {
let log = self.log.lock().unwrap();
if log.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
smallvec = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
Loading

0 comments on commit e5b89d4

Please sign in to comment.