Skip to content

Commit

Permalink
Drop ReplicationStrategy
Browse files Browse the repository at this point in the history
Summary:

The idea here is that we can't use `ReplicationStategy`
because it's not location aware and instead we should describe
placement in terms of `ReplicationProperty`.

If placement stragety is not set the system assume placement is done
on all nodes, otherwise the num of copies described by the replication
property is used.

> Right now the placement is still not location aware but this will
most probably change in the future
  • Loading branch information
muhamadazmy committed Jan 20, 2025
1 parent c487db2 commit e030c46
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 257 deletions.
18 changes: 10 additions & 8 deletions crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::time::Duration;

use bytes::{Bytes, BytesMut};
use restate_types::protobuf::cluster::ClusterConfiguration;
use restate_types::replicated_loglet::ReplicationProperty;
use tonic::{async_trait, Request, Response, Status};
use tracing::info;

Expand Down Expand Up @@ -299,8 +300,11 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
let response = GetClusterConfigurationResponse {
cluster_configuration: Some(ClusterConfiguration {
num_partitions: u32::from(partition_table.num_partitions()),
replication_strategy: Some(partition_table.replication_strategy().into()),
default_provider: Some(logs.configuration().default_provider.clone().into()),
partition_placement_strategy: partition_table
.placement_strategy()
.clone()
.map(Into::into),
bifrost_provider: Some(logs.configuration().default_provider.clone().into()),
}),
};

Expand All @@ -319,16 +323,14 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
self.controller_handle
.update_cluster_configuration(
request
.replication_strategy
.ok_or_else(|| {
Status::invalid_argument("replication_strategy is a required field")
})?
.try_into()
.partition_placement_strategy
.map(ReplicationProperty::try_from)
.transpose()
.map_err(|err| {
Status::invalid_argument(format!("invalid replication_strategy: {err}"))
})?,
request
.default_provider
.bifrost_provider
.ok_or_else(|| {
Status::invalid_argument("default_provider is a required field")
})?
Expand Down
61 changes: 33 additions & 28 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use rand::seq::IteratorRandom;
use restate_types::live::Pinned;
use restate_types::replicated_loglet::{LocationScope, ReplicationProperty};
use std::collections::BTreeMap;
use std::sync::Arc;
use tracing::debug;
Expand All @@ -28,9 +29,7 @@ use restate_types::net::partition_processor_manager::{
ControlProcessor, ControlProcessors, ProcessorCommand,
};
use restate_types::nodes_config::NodesConfiguration;
use restate_types::partition_table::{
Partition, PartitionPlacement, PartitionTable, ReplicationStrategy,
};
use restate_types::partition_table::{Partition, PartitionPlacement, PartitionTable};
use restate_types::{NodeId, PlainNodeId, Version};

use crate::cluster_controller::logs_controller;
Expand Down Expand Up @@ -139,15 +138,15 @@ impl<T: TransportConnect> Scheduler<T> {
// todo(azmy): avoid cloning the partition table every time by keeping
// the latest built always available as a field
let mut builder = partition_table.clone().into_builder();
let replication_strategy = builder.replication_strategy();
let replication_strategy = builder.placement_strategy().clone();

builder.for_each(|partition_id, placement| {
let mut target_state = TargetPartitionPlacementState::new(placement);
self.ensure_replication(
partition_id,
&mut target_state,
alive_workers,
replication_strategy,
replication_strategy.clone(),
nodes_config,
&placement_hints,
);
Expand Down Expand Up @@ -234,7 +233,7 @@ impl<T: TransportConnect> Scheduler<T> {
partition_id: &PartitionId,
target_state: &mut TargetPartitionPlacementState,
alive_workers: &HashSet<PlainNodeId>,
replication_strategy: ReplicationStrategy,
placement_strategy: Option<ReplicationProperty>,
nodes_config: &NodesConfiguration,
placement_hints: &H,
) {
Expand All @@ -243,17 +242,22 @@ impl<T: TransportConnect> Scheduler<T> {
.node_set
.retain(|node_id| alive_workers.contains(node_id));

match replication_strategy {
ReplicationStrategy::OnAllNodes => {
match placement_strategy {
None => {
// The extend will only add the new nodes that
// don't exist in the node set.
// the retain done above will make sure alive nodes in the set
// will keep there initial order.
target_state.node_set.extend(alive_workers.iter().cloned());
}
ReplicationStrategy::Factor(replication_factor) => {
let replication_factor =
usize::try_from(replication_factor.get()).expect("u32 should fit into usize");
Some(replication_factor) => {
assert_eq!(
replication_factor.greatest_defined_scope(),
LocationScope::Node,
"Location aware partition replication is not supported yet"
);

let replication_factor = usize::from(replication_factor.num_copies());

if target_state.node_set.len() == replication_factor {
return;
Expand Down Expand Up @@ -537,6 +541,7 @@ mod tests {
use http::Uri;
use rand::prelude::ThreadRng;
use rand::Rng;
use restate_types::replicated_loglet::ReplicationProperty;
use std::collections::BTreeMap;
use std::iter;
use std::num::NonZero;
Expand Down Expand Up @@ -565,7 +570,7 @@ mod tests {
LogServerConfig, NodeConfig, NodesConfiguration, Role, StorageState,
};
use restate_types::partition_table::{
PartitionPlacement, PartitionTable, PartitionTableBuilder, ReplicationStrategy,
PartitionPlacement, PartitionTable, PartitionTableBuilder,
};
use restate_types::time::MillisSinceEpoch;
use restate_types::{GenerationalNodeId, PlainNodeId, Version};
Expand Down Expand Up @@ -618,21 +623,21 @@ mod tests {

#[test(restate_core::test(start_paused = true))]
async fn schedule_partitions_with_replication_factor() -> googletest::Result<()> {
schedule_partitions(ReplicationStrategy::Factor(
schedule_partitions(Some(ReplicationProperty::new(
NonZero::new(3).expect("non-zero"),
))
)))
.await?;
Ok(())
}

#[test(restate_core::test(start_paused = true))]
async fn schedule_partitions_with_all_nodes_replication() -> googletest::Result<()> {
schedule_partitions(ReplicationStrategy::OnAllNodes).await?;
schedule_partitions(None).await?;
Ok(())
}

async fn schedule_partitions(
replication_strategy: ReplicationStrategy,
placement_strategy: Option<ReplicationProperty>,
) -> googletest::Result<()> {
let num_partitions = 64;
let num_nodes = 5;
Expand Down Expand Up @@ -687,7 +692,8 @@ mod tests {
let mut partition_table_builder =
PartitionTable::with_equally_sized_partitions(Version::MIN, num_partitions)
.into_builder();
partition_table_builder.set_replication_strategy(replication_strategy);
partition_table_builder.set_placement_strategy(placement_strategy.clone());

let partition_table = partition_table_builder.build();

let metadata_store_client = builder.metadata_store_client.clone();
Expand Down Expand Up @@ -742,27 +748,26 @@ mod tests {
for (_, partition) in target_partition_table.partitions_mut() {
let target_state = TargetPartitionPlacementState::new(&mut partition.placement);
// assert that the replication strategy was respected
match replication_strategy {
ReplicationStrategy::OnAllNodes => {
match &placement_strategy {
None => {
// assert that every partition has a leader which is part of the alive nodes set
assert!(target_state.contains_all(&alive_nodes));

assert!(target_state
.leader
.is_some_and(|leader| alive_nodes.contains(&leader)));
}
ReplicationStrategy::Factor(replication_factor) => {
Some(replication_property) => {
// assert that every partition has a leader which is part of the alive nodes set
assert!(target_state
.leader
.is_some_and(|leader| alive_nodes.contains(&leader)));

assert_eq!(
target_state.node_set.len(),
alive_nodes.len().min(
usize::try_from(replication_factor.get())
.expect("u32 fits into usize")
)
alive_nodes
.len()
.min(usize::from(replication_property.num_copies()))
);
}
}
Expand All @@ -785,7 +790,7 @@ mod tests {

let partition_table = run_ensure_replication_test(
partition_table_builder,
ReplicationStrategy::Factor(num_partition_processors),
Some(ReplicationProperty::new(num_partition_processors)),
)
.await?;
let partition = partition_table
Expand Down Expand Up @@ -817,7 +822,7 @@ mod tests {

let partition_table = run_ensure_replication_test(
partition_table_builder,
ReplicationStrategy::Factor(num_partition_processors),
Some(ReplicationProperty::new(num_partition_processors)),
)
.await?;
let partition = partition_table
Expand All @@ -834,7 +839,7 @@ mod tests {

async fn run_ensure_replication_test(
mut partition_table_builder: PartitionTableBuilder,
replication_strategy: ReplicationStrategy,
placement_strategy: Option<ReplicationProperty>,
) -> googletest::Result<PartitionTable> {
let env = TestCoreEnv::create_with_single_node(0, 0).await;

Expand All @@ -858,7 +863,7 @@ mod tests {
partition_id,
&mut target_state,
&alive_workers,
replication_strategy,
placement_strategy.clone(),
&nodes_config,
&NoPlacementHints,
);
Expand Down
20 changes: 9 additions & 11 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ use restate_types::logs::metadata::{
LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind, SegmentIndex,
};
use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY};
use restate_types::partition_table::{
self, PartitionTable, PartitionTableBuilder, ReplicationStrategy,
};
use restate_types::replicated_loglet::ReplicatedLogletParams;
use restate_types::partition_table::{self, PartitionTable, PartitionTableBuilder};
use restate_types::replicated_loglet::{ReplicatedLogletParams, ReplicationProperty};

use restate_bifrost::{Bifrost, SealedSegment};
use restate_core::network::rpc_router::RpcRouter;
Expand Down Expand Up @@ -172,7 +170,7 @@ enum ClusterControllerCommand {
response_tx: oneshot::Sender<anyhow::Result<SnapshotId>>,
},
UpdateClusterConfiguration {
replication_strategy: ReplicationStrategy,
placement_strategy: Option<ReplicationProperty>,
default_provider: ProviderConfiguration,
response_tx: oneshot::Sender<anyhow::Result<()>>,
},
Expand Down Expand Up @@ -237,15 +235,15 @@ impl ClusterControllerHandle {

pub async fn update_cluster_configuration(
&self,
replication_strategy: ReplicationStrategy,
placement_strategy: Option<ReplicationProperty>,
default_provider: ProviderConfiguration,
) -> Result<anyhow::Result<()>, ShutdownError> {
let (response_tx, response_rx) = oneshot::channel();

let _ = self
.tx
.send(ClusterControllerCommand::UpdateClusterConfiguration {
replication_strategy,
placement_strategy,
default_provider,
response_tx,
})
Expand Down Expand Up @@ -389,7 +387,7 @@ impl<T: TransportConnect> Service<T> {

async fn update_cluster_configuration(
&self,
replication_strategy: ReplicationStrategy,
placement_strategy: Option<ReplicationProperty>,
default_provider: ProviderConfiguration,
) -> anyhow::Result<()> {
let logs = self
Expand Down Expand Up @@ -446,8 +444,8 @@ impl<T: TransportConnect> Service<T> {

let mut builder: PartitionTableBuilder = partition_table.into();

if builder.replication_strategy() != replication_strategy {
builder.set_replication_strategy(replication_strategy);
if builder.placement_strategy() != &placement_strategy {
builder.set_placement_strategy(placement_strategy.clone());
}

builder
Expand Down Expand Up @@ -522,7 +520,7 @@ impl<T: TransportConnect> Service<T> {
.await;
}
ClusterControllerCommand::UpdateClusterConfiguration {
replication_strategy,
placement_strategy: replication_strategy,
default_provider,
response_tx,
} => {
Expand Down
11 changes: 6 additions & 5 deletions crates/core/protobuf/node_ctl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ service NodeCtlSvc {
rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse);

// Provision the Restate cluster on this node.
rpc ProvisionCluster(ProvisionClusterRequest) returns (ProvisionClusterResponse);
rpc ProvisionCluster(ProvisionClusterRequest)
returns (ProvisionClusterResponse);
}

message ProvisionClusterRequest {
bool dry_run = 1;
// if unset then the configured cluster num partitions will be used
optional uint32 num_partitions = 2;
// if unset then the configured cluster placement strategy will be used
optional restate.cluster.ReplicationStrategy placement_strategy = 3;
optional restate.cluster.ReplicationProperty partition_placement_strategy = 3;
// if unset then the configured cluster default log provider will be used
optional restate.cluster.DefaultProvider log_provider = 4;
optional restate.cluster.BifrostProvider log_provider = 4;
}

message ProvisionClusterResponse {
Expand Down Expand Up @@ -60,8 +61,8 @@ message IdentResponse {
}

message GetMetadataRequest {
// If set, we'll first sync with metadata store to esnure we are returning the latest value.
// Otherwise, we'll return the local value on this node.
// If set, we'll first sync with metadata store to esnure we are returning the
// latest value. Otherwise, we'll return the local value on this node.
bool sync = 1;
restate.common.MetadataKind kind = 2;
}
Expand Down
6 changes: 3 additions & 3 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use restate_core::network::net_util::create_tonic_channel;
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient;
use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest as ProtoProvisionClusterRequest;
use restate_types::logs::metadata::ProviderConfiguration;
use restate_types::partition_table::ReplicationStrategy;
use restate_types::protobuf::cluster::ReplicationProperty;
use restate_types::retries::RetryPolicy;
use restate_types::{
config::{Configuration, MetadataStoreClient},
Expand Down Expand Up @@ -759,7 +759,7 @@ impl StartedNode {
pub async fn provision_cluster(
&self,
num_partitions: Option<NonZeroU16>,
placement_strategy: Option<ReplicationStrategy>,
partition_placement_strategy: Option<ReplicationProperty>,
log_provider: Option<ProviderConfiguration>,
) -> anyhow::Result<bool> {
let channel = create_tonic_channel(
Expand All @@ -770,7 +770,7 @@ impl StartedNode {
let request = ProtoProvisionClusterRequest {
dry_run: false,
num_partitions: num_partitions.map(|num| u32::from(num.get())),
placement_strategy: placement_strategy.map(Into::into),
partition_placement_strategy: partition_placement_strategy.map(Into::into),
log_provider: log_provider.map(|log_provider| log_provider.into()),
};

Expand Down
Loading

0 comments on commit e030c46

Please sign in to comment.