Skip to content

Commit

Permalink
[Bifrost] bifrost loglet tailing read streams
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed May 28, 2024
1 parent 70acd6f commit f3ff9e7
Show file tree
Hide file tree
Showing 19 changed files with 785 additions and 222 deletions.
23 changes: 5 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ hyper = { version = "0.14.24", default-features = false }
hyper-rustls = { version = "0.24.1", features = ["http2"] }
itertools = "0.11.0"
metrics = { version = "0.22" }
metrics-exporter-prometheus = { version = "0.14", default-features = false, features = ["async-runtime"] }
once_cell = "1.18"
opentelemetry = { version = "0.22.0" }
opentelemetry-http = { version = "0.11.1" }
Expand Down
4 changes: 3 additions & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ futures = { workspace = true }
humantime = { workspace = true }
metrics = { workspace = true }
once_cell = { workspace = true }
pin-project = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
Expand All @@ -39,7 +40,8 @@ strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true }
tracing = { workspace = true }


Expand Down
82 changes: 49 additions & 33 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use restate_types::Version;

use crate::loglet::{LogletBase, LogletProvider, LogletWrapper};
use crate::watchdog::{WatchdogCommand, WatchdogSender};
use crate::{Error, FindTailAttributes, LogReadStream, LogRecord, SMALL_BATCH_THRESHOLD_COUNT};
use crate::{
Error, FindTailAttributes, LogReadStream, LogRecord, Result, SMALL_BATCH_THRESHOLD_COUNT,
};

/// Bifrost is Restate's durable interconnect system
///
Expand Down Expand Up @@ -65,27 +67,23 @@ impl Bifrost {
/// Appends a single record to a log. The log id must exist, otherwise the
/// operation fails with [`Error::UnknownLogId`]
#[instrument(level = "debug", skip(self, payload), err)]
pub async fn append(&mut self, log_id: LogId, payload: Payload) -> Result<Lsn, Error> {
pub async fn append(&mut self, log_id: LogId, payload: Payload) -> Result<Lsn> {
self.inner.append(log_id, payload).await
}

/// Appends a batch of records to a log. The log id must exist, otherwise the
/// operation fails with [`Error::UnknownLogId`]. The returned Lsn is the Lsn of the first
/// record in this batch. This will only return after all records have been stored.
#[instrument(level = "debug", skip(self, payloads), err)]
pub async fn append_batch(
&mut self,
log_id: LogId,
payloads: &[Payload],
) -> Result<Lsn, Error> {
pub async fn append_batch(&mut self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn> {
self.inner.append_batch(log_id, payloads).await
}

/// Read the next record after the LSN provided. The `start` indicates the LSN where we will
/// read after. This means that the record returned will have a LSN strictly greater than
/// `after`. If no records are committed yet after this LSN, this read operation will "wait"
/// for such records to appear.
pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord, Error> {
pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord> {
self.inner.read_next_single(log_id, after).await
}

Expand All @@ -96,12 +94,19 @@ impl Bifrost {
&self,
log_id: LogId,
after: Lsn,
) -> Result<Option<LogRecord>, Error> {
) -> Result<Option<LogRecord>> {
self.inner.read_next_single_opt(log_id, after).await
}

pub fn create_reader(&self, log_id: LogId, after: Lsn) -> LogReadStream {
LogReadStream::new(self.inner.clone(), log_id, after)
/// Create a read stream. Until is inclusive. Pass [[`Lsn::Max`]] for a tailing stream. Use
/// Lsn::INVALID in _after_ to read from the start (head) of the log.
pub async fn create_reader(
&self,
log_id: LogId,
after: Lsn,
until: Lsn,
) -> Result<LogReadStream> {
LogReadStream::create(self.inner.clone(), log_id, after, until).await
}

/// Finds the current readable tail LSN of a log.
Expand All @@ -110,8 +115,8 @@ impl Bifrost {
&self,
log_id: LogId,
attributes: FindTailAttributes,
) -> Result<Option<Lsn>, Error> {
self.inner.find_tail(log_id, attributes).await
) -> Result<Option<Lsn>> {
Ok(self.inner.find_tail(log_id, attributes).await?.1)
}

/// The lsn of the slot **before** the first readable record (if it exists), or the offset
Expand All @@ -138,16 +143,22 @@ impl Bifrost {

/// Read a full log with the given id. To be used only in tests!!!
#[cfg(any(test, feature = "test-util"))]
pub async fn read_all(&self, log_id: LogId) -> Result<Vec<LogRecord>, Error> {
pub async fn read_all(&self, log_id: LogId) -> Result<Vec<LogRecord>> {
use futures::TryStreamExt;

self.inner.fail_if_shutting_down()?;

let mut v = vec![];
let mut reader = self.create_reader(log_id, Lsn::INVALID);
while let Some(r) = reader.read_next_opt().await? {
v.push(r);
}
let current_tail = self
.find_tail(log_id, FindTailAttributes::default())
.await?;
let Some(current_tail) = current_tail else {
return Ok(Vec::default());
};

Ok(v)
let reader = self
.create_reader(log_id, Lsn::INVALID, current_tail)
.await?;
reader.try_collect().await
}
}

Expand Down Expand Up @@ -182,15 +193,15 @@ impl BifrostInner {

/// Appends a single record to a log. The log id must exist, otherwise the
/// operation fails with [`Error::UnknownLogId`]
pub async fn append(&self, log_id: LogId, payload: Payload) -> Result<Lsn, Error> {
pub async fn append(&self, log_id: LogId, payload: Payload) -> Result<Lsn> {
self.fail_if_shutting_down()?;
let loglet = self.writeable_loglet(log_id).await?;
let mut buf = BytesMut::default();
StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible");
loglet.append(buf.freeze()).await
}

pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn, Error> {
pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn> {
let loglet = self.writeable_loglet(log_id).await?;
let raw_payloads: SmallVec<[_; SMALL_BATCH_THRESHOLD_COUNT]> = payloads
.iter()
Expand All @@ -204,7 +215,7 @@ impl BifrostInner {
loglet.append_batch(&raw_payloads).await
}

pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord, Error> {
pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord> {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?;
Expand All @@ -219,7 +230,7 @@ impl BifrostInner {
&self,
log_id: LogId,
after: Lsn,
) -> Result<Option<LogRecord>, Error> {
) -> Result<Option<LogRecord>> {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?;
Expand All @@ -234,10 +245,11 @@ impl BifrostInner {
&self,
log_id: LogId,
_attributes: FindTailAttributes,
) -> Result<Option<Lsn>, Error> {
) -> Result<(LogletWrapper, Option<Lsn>)> {
self.fail_if_shutting_down()?;
let loglet = self.writeable_loglet(log_id).await?;
loglet.find_tail().await
let tail = loglet.find_tail().await?;
Ok((loglet, tail))
}

async fn get_trim_point(&self, log_id: LogId) -> Result<Option<Lsn>, Error> {
Expand Down Expand Up @@ -291,7 +303,7 @@ impl BifrostInner {
}

#[inline]
fn fail_if_shutting_down(&self) -> Result<(), Error> {
fn fail_if_shutting_down(&self) -> Result<()> {
if self.shutting_down.load(Ordering::Relaxed) {
Err(Error::Shutdown(restate_core::ShutdownError))
} else {
Expand All @@ -300,7 +312,7 @@ impl BifrostInner {
}

/// Immediately fetch new metadata from metadata store.
pub async fn sync_metadata(&self) -> Result<(), Error> {
pub async fn sync_metadata(&self) -> Result<()> {
self.fail_if_shutting_down()?;
self.metadata
.sync(MetadataKind::Logs)
Expand Down Expand Up @@ -342,7 +354,7 @@ impl BifrostInner {
assert!(self.providers[kind].try_insert(provider).is_ok());
}

async fn writeable_loglet(&self, log_id: LogId) -> Result<LogletWrapper, Error> {
async fn writeable_loglet(&self, log_id: LogId) -> Result<LogletWrapper> {
let tail_segment = self
.metadata
.logs()
Expand All @@ -351,7 +363,11 @@ impl BifrostInner {
self.get_loglet(&tail_segment).await
}

async fn find_loglet_for_lsn(&self, log_id: LogId, lsn: Lsn) -> Result<LogletWrapper, Error> {
pub(crate) async fn find_loglet_for_lsn(
&self,
log_id: LogId,
lsn: Lsn,
) -> Result<LogletWrapper> {
let segment = self
.metadata
.logs()
Expand Down Expand Up @@ -390,7 +406,7 @@ mod tests {

#[tokio::test]
#[traced_test]
async fn test_append_smoke() -> Result<()> {
async fn test_append_smoke() -> googletest::Result<()> {
let num_partitions = 5;
let node_env = TestCoreEnvBuilder::new_with_mock_network()
.with_partition_table(FixedPartitionTable::new(Version::MIN, num_partitions))
Expand Down Expand Up @@ -464,7 +480,7 @@ mod tests {
}

#[tokio::test(start_paused = true)]
async fn test_lazy_initialization() -> Result<()> {
async fn test_lazy_initialization() -> googletest::Result<()> {
let node_env = TestCoreEnv::create_with_mock_nodes_config(1, 1).await;
let tc = node_env.tc;
tc.run_in_scope("test", None, async {
Expand All @@ -491,7 +507,7 @@ mod tests {
}

#[test(tokio::test)]
async fn trim_log_smoke_test() -> Result<()> {
async fn trim_log_smoke_test() -> googletest::Result<()> {
let node_env = TestCoreEnvBuilder::new_with_mock_network()
.set_provider_kind(ProviderKind::Local)
.build()
Expand Down
6 changes: 4 additions & 2 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@

use restate_core::{ShutdownError, SyncError};
use std::sync::Arc;
use thiserror::Error;

use restate_types::logs::{LogId, Lsn};

use crate::loglets::local_loglet::LogStoreError;
use crate::types::SealReason;

#[derive(Error, Debug, Clone)]
/// Result type for bifrost operations.
pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug, Clone)]
pub enum Error {
#[error("log '{0}' is sealed")]
LogSealed(LogId, SealReason),
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod types;
mod watchdog;

pub use bifrost::Bifrost;
pub use error::{Error, ProviderError};
pub use error::{Error, ProviderError, Result};
pub use read_stream::LogReadStream;
pub use record::*;
pub use service::BifrostService;
Expand Down
Loading

0 comments on commit f3ff9e7

Please sign in to comment.