Skip to content

Commit

Permalink
[Locality] Location-aware NodeSetChecker
Browse files Browse the repository at this point in the history
This introduces a new sophisticated and performance-optimized nodeset checker for bifrost flexible quorums. It uses Arc+RwLock (parking-lot) to make this compatible with multi-threaded tokio runtimes. I initially designed it with Rc<RefCell<..>> and an `unsafe impl Send` but after thinking long and hard about it, there was no easy way to ensure that the safety guarantees won't break with future refactoring. Parking lot's RwLock has excellent performance in uncontended case which is _always_ going to be the case here.

The new nodeset checker has extensive test coverage
  • Loading branch information
AhmedSoliman committed Jan 16, 2025
1 parent 0c93812 commit 2c47453
Show file tree
Hide file tree
Showing 18 changed files with 1,458 additions and 298 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<'a> NodeSetSelector<'a> {
rng: &mut R,
preferred_nodes: &NodeSet,
) -> Result<NodeSet, NodeSelectionError> {
if replication_property.at_greatest_scope().0 != &LocationScope::Node {
if replication_property.greatest_defined_scope() > LocationScope::Node {
// todo: add support for other location scopes
unimplemented!("only node-scoped replication is currently supported");
}
Expand Down
1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ rocksdb = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
smallvec = { workspace = true }
smartstring = { workspace = true }
static_assertions = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl<'a> NodeSetSelector<'a> {
rng: &mut R,
preferred_nodes: &NodeSet,
) -> Result<NodeSet, NodeSelectionError> {
if replication_property.at_greatest_scope().0 != &LocationScope::Node {
if replication_property.greatest_defined_scope() > LocationScope::Node {
// todo: add support for other location scopes
unimplemented!("only node-scoped replication is currently supported");
}
Expand Down
1,616 changes: 1,359 additions & 257 deletions crates/bifrost/src/providers/replicated_loglet/replication/checker.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod checker;
pub mod spread_selector;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl SpreadSelector {
// validate that we can have write quorum with this spread
let mut checker =
NodeSetChecker::new(&self.nodeset, nodes_config, &self.replication_property);
checker.set_attribute_on_each(&selected, || true);
checker.set_attribute_on_each(selected.iter().copied(), true);
if !checker.check_write_quorum(|attr| *attr) {
return Err(SpreadSelectorError::InsufficientWriteableNodes);
}
Expand Down Expand Up @@ -153,7 +153,7 @@ impl SpreadSelector {
// validate that we can have write quorum with this spread
let mut checker =
NodeSetChecker::new(&self.nodeset, nodes_config, &self.replication_property);
checker.set_attribute_on_each(&selected, || true);
checker.set_attribute_on_each(selected.iter().copied(), true);
if !checker.check_write_quorum(|attr| *attr) {
return Err(SpreadSelectorError::InsufficientWriteableNodes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ impl<T: TransportConnect> SequencerAppender<T> {
}
}

#[derive(Default, Debug)]
#[derive(Default, Debug, Eq, PartialEq, Hash, Clone)]
struct NodeAttributes {
committed: bool,
sealed: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl CheckSealTask {
&networking.metadata().nodes_config_ref(),
);

let mut nodeset_checker = NodeSetChecker::<'_, NodeTailStatus>::new(
let mut nodeset_checker = NodeSetChecker::<NodeTailStatus>::new(
&effective_nodeset,
&networking.metadata().nodes_config_ref(),
&my_params.replication,
Expand All @@ -80,6 +80,7 @@ impl CheckSealTask {
effective_nodeset = %effective_nodeset,
"Checking seal status for loglet",
);

loop {
if nodeset_checker
.check_fmajority(NodeTailStatus::is_known_unsealed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl Digests {
self.spread_selector.replication_property(),
);
// record is already replicated on those nodes
replication_checker.set_attribute_on_each(known_copies, || true);
replication_checker.set_attribute_on_each(known_copies.iter().copied(), true);

let payloads = vec![record].into();

Expand Down Expand Up @@ -306,7 +306,7 @@ impl Digests {
nodes_config,
self.spread_selector.replication_property(),
);
checker.set_attribute_on_each(&self.known_nodes, || true);
checker.set_attribute_on_each(self.known_nodes.iter().copied(), true);
checker.check_write_quorum(|known| *known)
}

Expand All @@ -324,8 +324,7 @@ impl Digests {
);
// walk backwards
while let Some((offset, nodes)) = range.next_back() {
checker.reset_with_default();
checker.set_attribute_on_each(nodes, || true);
checker.set_attribute_on_each(nodes.iter().copied(), true);
if checker.check_write_quorum(|known| *known) {
// this offset is good, advance to the next one
self.update_start_offset(offset.next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl<T: TransportConnect> FindTailTask<T> {

// We'll only refresh our view of the effective nodeset if we retry the find-tail
// procedure.
let mut nodeset_checker = NodeSetChecker::<'_, NodeTailStatus>::new(
let mut nodeset_checker = NodeSetChecker::<NodeTailStatus>::new(
&effective_nodeset,
&self.networking.metadata().nodes_config_ref(),
&self.my_params.replication,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<'a> GetTrimPointTask<'a> {
});
}

let mut nodeset_checker = NodeSetChecker::<'_, Option<LogletOffset>>::new(
let mut nodeset_checker = NodeSetChecker::<Option<LogletOffset>>::new(
&effective_nodeset,
&networking.metadata().nodes_config_ref(),
&self.my_params.replication,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use trim::*;
use restate_types::logs::LogletOffset;
use restate_types::Merge;

#[derive(Debug, Default, PartialEq, Eq, derive_more::Display)]
#[derive(Debug, Clone, Hash, Default, PartialEq, Eq, derive_more::Display)]
enum NodeTailStatus {
#[default]
#[display("Unknown")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl SealTask {

let (tx, mut rx) = mpsc::unbounded_channel();

let mut nodeset_checker = NodeSetChecker::<'_, bool>::new(
let mut nodeset_checker = NodeSetChecker::<bool>::new(
&effective_nodeset,
&networking.metadata().nodes_config_ref(),
&self.my_params.replication,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<'a> TrimTask<'a> {
return Ok(());
}

let mut nodeset_checker = NodeSetChecker::<'_, bool>::new(
let mut nodeset_checker = NodeSetChecker::<bool>::new(
&effective_nodeset,
&networking.metadata().nodes_config_ref(),
&self.my_params.replication,
Expand Down
20 changes: 19 additions & 1 deletion crates/types/src/nodes_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ impl NodesConfiguration {
})
}

/// Iterate over nodes with a given role
pub fn iter_role(&self, role: Role) -> impl Iterator<Item = (PlainNodeId, &'_ NodeConfig)> {
self.nodes.iter().filter_map(move |(k, v)| match v {
MaybeNode::Node(node) if node.has_role(role) => Some((*k, node)),
_ => None,
})
}

/// Iterate over all non-tombstone nodes
pub fn iter(&self) -> impl Iterator<Item = (PlainNodeId, &'_ NodeConfig)> {
self.nodes.iter().filter_map(|(k, v)| {
if let MaybeNode::Node(node) = v {
Expand Down Expand Up @@ -295,7 +304,8 @@ pub enum StorageState {
/// can write to: yes
ReadWrite,
/// Node detected that some/all of its local storage has been deleted and it cannot be used
/// as authoritative source for quorum-dependent queries.
/// as authoritative source for quorum-dependent queries. Some data might have permanently been
/// lost.
///
/// should read from: yes (non-quorum reads)
/// can write to: no
Expand All @@ -319,6 +329,14 @@ impl StorageState {
}
}

pub fn is_authoritative(&self) -> bool {
use StorageState::*;
match self {
Provisioning | Disabled | DataLoss => false,
ReadOnly | ReadWrite => true,
}
}

/// Empty nodes are automatically excluded from node sets.
pub fn empty(&self) -> bool {
matches!(self, StorageState::Provisioning | StorageState::Disabled)
Expand Down
4 changes: 2 additions & 2 deletions crates/types/src/replicated_loglet/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ impl NodeSet {
Self(HashSet::new())
}

pub fn from_single(node: PlainNodeId) -> Self {
pub fn from_single(node: impl Into<PlainNodeId>) -> Self {
let mut set = HashSet::new();
set.insert(node);
set.insert(node.into());
Self(set)
}

Expand Down
83 changes: 60 additions & 23 deletions crates/types/src/replicated_loglet/replication_property.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use anyhow::Context;
use enum_map::Enum;
use regex::Regex;

use crate::locality::NodeLocationScope;

static REPLICATION_PROPERTY_PATTERN: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r"^(?i)\{\s*(?<scopes>(?:node|zone|region)\s*:\s*\d+(?:\s*,\s*(?:node|zone|region)\s*:\s*\d+)*)\s*}$",
Expand Down Expand Up @@ -69,6 +71,27 @@ impl LocationScope {
}
}

impl From<NodeLocationScope> for LocationScope {
fn from(scope: NodeLocationScope) -> Self {
match scope {
NodeLocationScope::Node => Self::Node,
NodeLocationScope::Zone => Self::Zone,
NodeLocationScope::Region => Self::Region,
NodeLocationScope::Root => panic!("Root is not a valid location scope"),
}
}
}

impl From<LocationScope> for NodeLocationScope {
fn from(scope: LocationScope) -> Self {
match scope {
LocationScope::Node => Self::Node,
LocationScope::Zone => Self::Zone,
LocationScope::Region => Self::Region,
}
}
}

#[derive(Debug, thiserror::Error)]
#[error("{0}")]
pub struct ReplicationPropertyError(String);
Expand Down Expand Up @@ -119,6 +142,7 @@ impl ReplicationProperty {
Ok(self)
}

/// Total number of copies required to satisfy the replication property
pub fn num_copies(&self) -> u8 {
*self
.0
Expand All @@ -127,20 +151,26 @@ impl ReplicationProperty {
.1
}

pub fn at_scope_or_greater(&self, scope: LocationScope) -> Option<(&LocationScope, &u8)> {
self.0.range(scope..).next()
}

pub fn at_smallest_scope(&self) -> (&LocationScope, &u8) {
self.0
.first_key_value()
.expect("must have at least one scope")
/// How many copies are required at this location scope.
/// Returns None if no copies are defined at the given scope.
/// For instance {zone: 2, node: 3} replication will return None at region scope.
///
/// Note that it's guaranteed to get a value for replication at node-level scope.
pub fn copies_at_scope(&self, scope: impl Into<LocationScope>) -> Option<u8> {
let scope = scope.into();
if scope == LocationScope::MIN {
Some(self.num_copies())
} else {
self.0.get(&scope).copied()
}
}

pub fn at_greatest_scope(&self) -> (&LocationScope, &u8) {
self.0
pub fn greatest_defined_scope(&self) -> LocationScope {
*self
.0
.last_key_value()
.expect("must have at least one scope")
.0
}
}

Expand Down Expand Up @@ -238,25 +268,26 @@ mod tests {
fn test_replication_property() -> Result<()> {
let mut r = ReplicationProperty::new(NonZeroU8::new(4).unwrap());
assert_that!(r.num_copies(), eq(4));
assert_that!(r.at_greatest_scope(), eq((&LocationScope::Node, &4)));
assert_that!(
r.at_scope_or_greater(LocationScope::Node),
some(eq((&LocationScope::Node, &4)))
);
assert_that!(r.greatest_defined_scope(), eq(LocationScope::Node));

assert_that!(r.copies_at_scope(LocationScope::Node), some(eq(4)));
assert_that!(r.copies_at_scope(LocationScope::Zone), none());
assert_that!(r.copies_at_scope(LocationScope::Region), none());

r.set_scope(LocationScope::Region, NonZeroU8::new(2).unwrap())?;
assert_that!(r.num_copies(), eq(4));
assert_that!(
r.at_scope_or_greater(LocationScope::Zone),
some(eq((&LocationScope::Region, &2)))
);

assert_that!(r.copies_at_scope(LocationScope::Node), some(eq(4)));
assert_that!(r.copies_at_scope(LocationScope::Zone), none());
assert_that!(r.copies_at_scope(LocationScope::Region), some(eq(2)));

r.set_scope(LocationScope::Zone, NonZeroU8::new(2).unwrap())?;
assert_that!(r.num_copies(), eq(4));
assert_that!(
r.at_scope_or_greater(LocationScope::Zone),
some(eq((&LocationScope::Zone, &2)))
);

assert_that!(r.copies_at_scope(LocationScope::Node), some(eq(4)));
assert_that!(r.copies_at_scope(LocationScope::Zone), some(eq(2)));
assert_that!(r.copies_at_scope(LocationScope::Region), some(eq(2)));

Ok(())
}

Expand All @@ -275,6 +306,12 @@ mod tests {
.unwrap();
assert_that!(r, ok(eq(expected)));

let r: ReplicationProperty = "{zone: 2}".parse().unwrap();
assert_that!(r.num_copies(), eq(2));

assert_that!(r.copies_at_scope(LocationScope::Node), some(eq(2)));
assert_that!(r.copies_at_scope(LocationScope::Zone), some(eq(2)));
assert_that!(r.copies_at_scope(LocationScope::Region), none());
Ok(())
}
}

0 comments on commit 2c47453

Please sign in to comment.