Skip to content

Commit

Permalink
Implement simple log truncation mechanism
Browse files Browse the repository at this point in the history
This commit introduces a simple log truncation mechanism. It works
the following way:

The PartitionProcessorManager spawns a PersistedLogLsnWatchdog which
periodically checks what is the latest persisted applied log lsn for
all registered partition stores. The way this works is by reading the
applied lsn and then triggering a memtable flush. In order to not overwhelm
the I/O system, the watchdog triggers the flush one after another. The
default interval is set to 60 minutes. Additionally, the watchdog only
persists the applied lsn if its difference to the previously persisted
applied lsn is above a configurable threshold (default: 1000).

The Watchdog sends the latest persisted applied log lsn to the
PartitionProcessorManager where this information is used to enrich the
PartitionProcessorStatus. The PartitionProcessorStatus is periodically
send to the ClusterController service when receiving a heartbeat.

The ClusterController service periodically checks the latest persisted
log lsns with the latest trim points of the existing logs. If the difference
is above a threshold, it triggers the trimming of the log. The default
interval for checking the difference is set to 60 minutes and the threshold
is 1000 log entries.

Additionally, this commit adds a a TrimLog handler to the ClusterCtrlSvc
which allows to manually trim a given log to a given trim point. This
endpoint is intended for internal usage only (as an escape hatch).

This fixes #1502.
  • Loading branch information
tillrohrmann authored and AhmedSoliman committed May 28, 2024
1 parent 16e2e4d commit 5973193
Show file tree
Hide file tree
Showing 28 changed files with 1,139 additions and 76 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ tracing = { workspace = true }
restate-core = { workspace = true, features = ["test-util"] }
restate-metadata-store = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }

criterion = { workspace = true, features = ["async_tokio"] }
googletest = { workspace = true }
test-log = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
Expand Down
157 changes: 151 additions & 6 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use smallvec::SmallVec;
use tracing::{error, instrument};

use restate_core::{metadata, Metadata, MetadataKind};
use restate_types::logs::metadata::ProviderKind;
use restate_types::logs::metadata::{ProviderKind, Segment};
use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
use restate_types::Version;
Expand Down Expand Up @@ -114,6 +114,18 @@ impl Bifrost {
self.inner.find_tail(log_id, attributes).await
}

/// The lsn of the slot **before** the first readable record (if it exists), or the offset
/// before the next slot that will be written to.
pub async fn get_trim_point(&self, log_id: LogId) -> Result<Option<Lsn>, Error> {
self.inner.get_trim_point(log_id).await
}

/// Trims the given log to the minimum of the provided trim point or the current tail.
#[instrument(level = "debug", skip(self), err)]
pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<(), Error> {
self.inner.trim(log_id, trim_point).await
}

/// The version of the currently loaded logs metadata
pub fn version(&self) -> Version {
metadata().logs_version()
Expand Down Expand Up @@ -228,6 +240,56 @@ impl BifrostInner {
loglet.find_tail().await
}

async fn get_trim_point(&self, log_id: LogId) -> Result<Option<Lsn>, Error> {
self.fail_if_shutting_down()?;

let logs = self.metadata.logs().ok_or(Error::UnknownLogId(log_id))?;
let log_chain = logs.logs.get(&log_id).ok_or(Error::UnknownLogId(log_id))?;

let mut trim_point = None;

// iterate over the chain until we find the first missing trim point, return value before
// todo: maybe update configuration to remember trim point for the whole chain
for segment in log_chain.iter() {
let loglet = self.get_loglet(&segment).await?;
let loglet_specific_trim_point = loglet.get_trim_point().await?;

// if a loglet has no trim point, then all subsequent loglets should also not contain a trim point
if loglet_specific_trim_point.is_none() {
break;
}

trim_point = loglet_specific_trim_point;
}

Ok(trim_point)
}

async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<(), Error> {
self.fail_if_shutting_down()?;

let logs = self.metadata.logs().ok_or(Error::UnknownLogId(log_id))?;
let log_chain = logs.logs.get(&log_id).ok_or(Error::UnknownLogId(log_id))?;

for segment in log_chain.iter() {
let loglet = self.get_loglet(&segment).await?;

if loglet.base_lsn > trim_point {
break;
}

if let Some(local_trim_point) =
loglet.find_tail().await?.map(|tail| tail.min(trim_point))
{
loglet.trim(local_trim_point).await?;
}
}

// todo: Update logs configuration to remove sealed and empty loglets

Ok(())
}

#[inline]
fn fail_if_shutting_down(&self) -> Result<(), Error> {
if self.shutting_down.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -286,10 +348,7 @@ impl BifrostInner {
.logs()
.and_then(|logs| logs.tail_segment(log_id))
.ok_or(Error::UnknownLogId(log_id))?;
let provider = self.provider_for(tail_segment.config.kind);
let loglet = provider.get_loglet(&tail_segment.config.params).await?;

Ok(LogletWrapper::new(tail_segment.base_lsn, loglet))
self.get_loglet(&tail_segment).await
}

async fn find_loglet_for_lsn(&self, log_id: LogId, lsn: Lsn) -> Result<LogletWrapper, Error> {
Expand All @@ -298,9 +357,12 @@ impl BifrostInner {
.logs()
.and_then(|logs| logs.find_segment_for_lsn(log_id, lsn))
.ok_or(Error::UnknownLogId(log_id))?;
self.get_loglet(&segment).await
}

async fn get_loglet(&self, segment: &Segment) -> Result<LogletWrapper, Error> {
let provider = self.provider_for(segment.config.kind);
let loglet = provider.get_loglet(&segment.config.params).await?;

Ok(LogletWrapper::new(segment.base_lsn, loglet))
}
}
Expand All @@ -314,10 +376,15 @@ mod tests {
use crate::loglets::memory_loglet::MemoryLogletProvider;
use googletest::prelude::*;

use crate::{Record, TrimGap};
use restate_core::TestCoreEnv;
use restate_core::{task_center, TestCoreEnvBuilder};
use restate_rocksdb::RocksDbManager;
use restate_types::arc_util::Constant;
use restate_types::config::CommonOptions;
use restate_types::logs::SequenceNumber;
use restate_types::partition_table::FixedPartitionTable;
use test_log::test;
use tracing::info;
use tracing_test::traced_test;

Expand Down Expand Up @@ -422,4 +489,82 @@ mod tests {
})
.await
}

#[test(tokio::test)]
async fn trim_log_smoke_test() -> Result<()> {
let node_env = TestCoreEnvBuilder::new_with_mock_network()
.set_provider_kind(ProviderKind::Local)
.build()
.await;
node_env
.tc
.run_in_scope("test", None, async {
RocksDbManager::init(Constant::new(CommonOptions::default()));

let log_id = LogId::from(0);
let mut bifrost = Bifrost::init().await;

assert!(bifrost.get_trim_point(log_id).await?.is_none());

for _ in 1..=10 {
bifrost.append(log_id, Payload::default()).await?;
}

bifrost.trim(log_id, Lsn::from(5)).await?;

assert_eq!(
bifrost
.find_tail(log_id, FindTailAttributes::default())
.await?,
Some(Lsn::from(10))
);
assert_eq!(bifrost.get_trim_point(log_id).await?, Some(Lsn::from(5)));

for lsn in 0..5 {
let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?;
assert_that!(
record,
pat!(Some(pat!(LogRecord {
offset: eq(Lsn::from(lsn + 1)),
record: pat!(Record::TrimGap(pat!(TrimGap {
until: eq(Lsn::from(5)),
})))
})))
)
}

for lsn in 5..10 {
let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?;
assert_that!(
record,
pat!(Some(pat!(LogRecord {
offset: eq(Lsn::from(lsn + 1)),
record: pat!(Record::Data(_))
})))
);
}

// trimming beyond the release point will fall back to the release point
bifrost.trim(log_id, Lsn::from(u64::MAX)).await?;
assert_eq!(bifrost.get_trim_point(log_id).await?, Some(Lsn::from(10)));

for _ in 0..10 {
bifrost.append(log_id, Payload::default()).await?;
}

for lsn in 10..20 {
let record = bifrost.read_next_single_opt(log_id, Lsn::from(lsn)).await?;
assert_that!(
record,
pat!(Some(pat!(LogRecord {
offset: eq(Lsn::from(lsn + 1)),
record: pat!(Record::Data(_))
})))
);
}

Ok(())
})
.await
}
}
13 changes: 10 additions & 3 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ pub trait LogletBase: Send + Sync {

/// The offset of the slot **before** the first readable record (if it exists), or the offset
/// before the next slot that will be written to.
async fn get_trim_point(&self) -> Result<Self::Offset, Error>;
async fn get_trim_point(&self) -> Result<Option<Self::Offset>, Error>;

async fn trim(&self, trim_point: Self::Offset) -> Result<(), Error>;

/// Read or wait for the record at `from` offset, or the next available record if `from` isn't
/// defined for the loglet.
Expand Down Expand Up @@ -162,9 +164,14 @@ impl LogletBase for LogletWrapper {
Ok(offset.map(|o| self.base_lsn.offset_by(o)))
}

async fn get_trim_point(&self) -> Result<Self::Offset, Error> {
async fn get_trim_point(&self) -> Result<Option<Lsn>, Error> {
let offset = self.loglet.get_trim_point().await?;
Ok(self.base_lsn.offset_by(offset))
Ok(offset.map(|o| self.base_lsn.offset_by(o)))
}

async fn trim(&self, trim_point: Self::Offset) -> Result<(), Error> {
let trim_point = trim_point.into_offset(self.base_lsn);
self.loglet.trim(trim_point).await
}

async fn read_next_single(&self, after: Lsn) -> Result<LogRecord<Lsn, Bytes>, Error> {
Expand Down
85 changes: 72 additions & 13 deletions crates/bifrost/src/loglets/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ pub struct LogStoreWriteCommand {
}

enum DataUpdate {
PutRecord { offset: LogletOffset, data: Bytes },
PutRecord {
offset: LogletOffset,
data: Bytes,
},
TrimLog {
old_trim_point: LogletOffset,
new_trim_point: LogletOffset,
},
}

pub(crate) struct LogStoreWriter {
Expand Down Expand Up @@ -84,6 +91,7 @@ impl LogStoreWriter {
"local-loglet-writer",
None,
async move {
debug!("Start running LogStoreWriter");
let opts = updateable.load();
let batch_size = std::cmp::max(1, opts.writer_batch_commit_count);
let batch_duration: Duration = opts.writer_batch_commit_duration.into();
Expand Down Expand Up @@ -154,6 +162,16 @@ impl LogStoreWriter {
offset,
data,
),
DataUpdate::TrimLog {
old_trim_point,
new_trim_point,
} => Self::trim_log(
&data_cf,
&mut write_batch,
command.log_id,
old_trim_point,
new_trim_point,
),
}
}

Expand Down Expand Up @@ -206,6 +224,24 @@ impl LogStoreWriter {
write_batch.put_cf(data_cf, &key.to_bytes(), data);
}

fn trim_log(
data_cf: &Arc<BoundColumnFamily>,
write_batch: &mut WriteBatch,
id: u64,
old_trim_point: LogletOffset,
new_trim_point: LogletOffset,
) {
// the old trim point has already been removed on the previous trim operation
let from = RecordKey::new(id, old_trim_point.next());
// the upper bound is exclusive for range deletions, therefore we need to increase it
let to = RecordKey::new(id, new_trim_point.next());

trace!("Trim log range: [{from:?}, {to:?})");
// We probably need to measure whether range delete is better than single deletes for
// multiple trim operations
write_batch.delete_range_cf(data_cf, &from.to_bytes(), &to.to_bytes());
}

async fn commit(&mut self, opts: &LocalLogletOptions, write_batch: WriteBatch) {
let mut write_opts = rocksdb::WriteOptions::new();
write_opts.disable_wal(opts.rocksdb.rocksdb_disable_wal());
Expand Down Expand Up @@ -288,23 +324,46 @@ impl RocksDbLogWriterHandle {
let data_updates = data_updates;
let log_state_updates =
Some(LogStateUpdates::default().update_release_pointer(start_offset.prev()));
if let Err(e) = self
.sender
.send(LogStoreWriteCommand {
log_id,
data_updates,
log_state_updates,
ack: Some(ack),
})
.await
{
self.send_command(LogStoreWriteCommand {
log_id,
data_updates,
log_state_updates,
ack: Some(ack),
})
.await?;
Ok(receiver)
}

pub async fn enqueue_trim(
&self,
log_id: u64,
old_trim_point: LogletOffset,
new_trim_point: LogletOffset,
) -> Result<(), ShutdownError> {
let mut data_updates = SmallVec::with_capacity(1);
data_updates.push(DataUpdate::TrimLog {
old_trim_point,
new_trim_point,
});
let log_state_updates = Some(LogStateUpdates::default().update_trim_point(new_trim_point));

self.send_command(LogStoreWriteCommand {
log_id,
data_updates,
log_state_updates,
ack: None,
})
.await
}

async fn send_command(&self, command: LogStoreWriteCommand) -> Result<(), ShutdownError> {
if let Err(e) = self.sender.send(command).await {
warn!(
"Local loglet writer task is gone, not accepting the record: {}",
e
);
return Err(ShutdownError);
}

Ok(receiver)
Ok(())
}
}
Loading

0 comments on commit 5973193

Please sign in to comment.