diff --git a/crates/admin/src/cluster_controller/grpc_svc_handler.rs b/crates/admin/src/cluster_controller/grpc_svc_handler.rs index 5d012c23c..e1beed71e 100644 --- a/crates/admin/src/cluster_controller/grpc_svc_handler.rs +++ b/crates/admin/src/cluster_controller/grpc_svc_handler.rs @@ -12,6 +12,7 @@ use std::time::Duration; use bytes::{Bytes, BytesMut}; use restate_types::protobuf::cluster::ClusterConfiguration; +use restate_types::replicated_loglet::ReplicationProperty; use tonic::{async_trait, Request, Response, Status}; use tracing::info; @@ -299,8 +300,11 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { let response = GetClusterConfigurationResponse { cluster_configuration: Some(ClusterConfiguration { num_partitions: u32::from(partition_table.num_partitions()), - replication_strategy: Some(partition_table.replication_strategy().into()), - default_provider: Some(logs.configuration().default_provider.clone().into()), + partition_placement_strategy: partition_table + .placement_strategy() + .clone() + .map(Into::into), + bifrost_provider: Some(logs.configuration().default_provider.clone().into()), }), }; @@ -319,16 +323,14 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { self.controller_handle .update_cluster_configuration( request - .replication_strategy - .ok_or_else(|| { - Status::invalid_argument("replication_strategy is a required field") - })? - .try_into() + .partition_placement_strategy + .map(ReplicationProperty::try_from) + .transpose() .map_err(|err| { Status::invalid_argument(format!("invalid replication_strategy: {err}")) })?, request - .default_provider + .bifrost_provider .ok_or_else(|| { Status::invalid_argument("default_provider is a required field") })? diff --git a/crates/admin/src/cluster_controller/scheduler.rs b/crates/admin/src/cluster_controller/scheduler.rs index e4c8b4a96..3905f81dc 100644 --- a/crates/admin/src/cluster_controller/scheduler.rs +++ b/crates/admin/src/cluster_controller/scheduler.rs @@ -10,6 +10,7 @@ use rand::seq::IteratorRandom; use restate_types::live::Pinned; +use restate_types::replicated_loglet::{LocationScope, ReplicationProperty}; use std::collections::BTreeMap; use std::sync::Arc; use tracing::debug; @@ -28,9 +29,7 @@ use restate_types::net::partition_processor_manager::{ ControlProcessor, ControlProcessors, ProcessorCommand, }; use restate_types::nodes_config::NodesConfiguration; -use restate_types::partition_table::{ - Partition, PartitionPlacement, PartitionTable, ReplicationStrategy, -}; +use restate_types::partition_table::{Partition, PartitionPlacement, PartitionTable}; use restate_types::{NodeId, PlainNodeId, Version}; use crate::cluster_controller::logs_controller; @@ -139,7 +138,7 @@ impl Scheduler { // todo(azmy): avoid cloning the partition table every time by keeping // the latest built always available as a field let mut builder = partition_table.clone().into_builder(); - let replication_strategy = builder.replication_strategy(); + let replication_strategy = builder.placement_strategy().clone(); builder.for_each(|partition_id, placement| { let mut target_state = TargetPartitionPlacementState::new(placement); @@ -147,7 +146,7 @@ impl Scheduler { partition_id, &mut target_state, alive_workers, - replication_strategy, + replication_strategy.clone(), nodes_config, &placement_hints, ); @@ -234,7 +233,7 @@ impl Scheduler { partition_id: &PartitionId, target_state: &mut TargetPartitionPlacementState, alive_workers: &HashSet, - replication_strategy: ReplicationStrategy, + placement_strategy: Option, nodes_config: &NodesConfiguration, placement_hints: &H, ) { @@ -243,17 +242,22 @@ impl Scheduler { .node_set .retain(|node_id| alive_workers.contains(node_id)); - match replication_strategy { - ReplicationStrategy::OnAllNodes => { + match placement_strategy { + None => { // The extend will only add the new nodes that // don't exist in the node set. // the retain done above will make sure alive nodes in the set // will keep there initial order. target_state.node_set.extend(alive_workers.iter().cloned()); } - ReplicationStrategy::Factor(replication_factor) => { - let replication_factor = - usize::try_from(replication_factor.get()).expect("u32 should fit into usize"); + Some(replication_factor) => { + assert_eq!( + replication_factor.greatest_defined_scope(), + LocationScope::Node, + "Location aware partition replication is not supported yet" + ); + + let replication_factor = usize::from(replication_factor.num_copies()); if target_state.node_set.len() == replication_factor { return; @@ -537,6 +541,7 @@ mod tests { use http::Uri; use rand::prelude::ThreadRng; use rand::Rng; + use restate_types::replicated_loglet::ReplicationProperty; use std::collections::BTreeMap; use std::iter; use std::num::NonZero; @@ -565,7 +570,7 @@ mod tests { LogServerConfig, NodeConfig, NodesConfiguration, Role, StorageState, }; use restate_types::partition_table::{ - PartitionPlacement, PartitionTable, PartitionTableBuilder, ReplicationStrategy, + PartitionPlacement, PartitionTable, PartitionTableBuilder, }; use restate_types::time::MillisSinceEpoch; use restate_types::{GenerationalNodeId, PlainNodeId, Version}; @@ -618,21 +623,21 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn schedule_partitions_with_replication_factor() -> googletest::Result<()> { - schedule_partitions(ReplicationStrategy::Factor( + schedule_partitions(Some(ReplicationProperty::new( NonZero::new(3).expect("non-zero"), - )) + ))) .await?; Ok(()) } #[test(restate_core::test(start_paused = true))] async fn schedule_partitions_with_all_nodes_replication() -> googletest::Result<()> { - schedule_partitions(ReplicationStrategy::OnAllNodes).await?; + schedule_partitions(None).await?; Ok(()) } async fn schedule_partitions( - replication_strategy: ReplicationStrategy, + placement_strategy: Option, ) -> googletest::Result<()> { let num_partitions = 64; let num_nodes = 5; @@ -687,7 +692,8 @@ mod tests { let mut partition_table_builder = PartitionTable::with_equally_sized_partitions(Version::MIN, num_partitions) .into_builder(); - partition_table_builder.set_replication_strategy(replication_strategy); + partition_table_builder.set_placement_strategy(placement_strategy.clone()); + let partition_table = partition_table_builder.build(); let metadata_store_client = builder.metadata_store_client.clone(); @@ -742,8 +748,8 @@ mod tests { for (_, partition) in target_partition_table.partitions_mut() { let target_state = TargetPartitionPlacementState::new(&mut partition.placement); // assert that the replication strategy was respected - match replication_strategy { - ReplicationStrategy::OnAllNodes => { + match &placement_strategy { + None => { // assert that every partition has a leader which is part of the alive nodes set assert!(target_state.contains_all(&alive_nodes)); @@ -751,7 +757,7 @@ mod tests { .leader .is_some_and(|leader| alive_nodes.contains(&leader))); } - ReplicationStrategy::Factor(replication_factor) => { + Some(replication_property) => { // assert that every partition has a leader which is part of the alive nodes set assert!(target_state .leader @@ -759,10 +765,9 @@ mod tests { assert_eq!( target_state.node_set.len(), - alive_nodes.len().min( - usize::try_from(replication_factor.get()) - .expect("u32 fits into usize") - ) + alive_nodes + .len() + .min(usize::from(replication_property.num_copies())) ); } } @@ -785,7 +790,7 @@ mod tests { let partition_table = run_ensure_replication_test( partition_table_builder, - ReplicationStrategy::Factor(num_partition_processors), + Some(ReplicationProperty::new(num_partition_processors)), ) .await?; let partition = partition_table @@ -817,7 +822,7 @@ mod tests { let partition_table = run_ensure_replication_test( partition_table_builder, - ReplicationStrategy::Factor(num_partition_processors), + Some(ReplicationProperty::new(num_partition_processors)), ) .await?; let partition = partition_table @@ -834,7 +839,7 @@ mod tests { async fn run_ensure_replication_test( mut partition_table_builder: PartitionTableBuilder, - replication_strategy: ReplicationStrategy, + placement_strategy: Option, ) -> googletest::Result { let env = TestCoreEnv::create_with_single_node(0, 0).await; @@ -858,7 +863,7 @@ mod tests { partition_id, &mut target_state, &alive_workers, - replication_strategy, + placement_strategy.clone(), &nodes_config, &NoPlacementHints, ); diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 3c4d32f11..ad4b418a2 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -27,10 +27,8 @@ use restate_types::logs::metadata::{ LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind, SegmentIndex, }; use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY}; -use restate_types::partition_table::{ - self, PartitionTable, PartitionTableBuilder, ReplicationStrategy, -}; -use restate_types::replicated_loglet::ReplicatedLogletParams; +use restate_types::partition_table::{self, PartitionTable, PartitionTableBuilder}; +use restate_types::replicated_loglet::{ReplicatedLogletParams, ReplicationProperty}; use restate_bifrost::{Bifrost, SealedSegment}; use restate_core::network::rpc_router::RpcRouter; @@ -172,7 +170,7 @@ enum ClusterControllerCommand { response_tx: oneshot::Sender>, }, UpdateClusterConfiguration { - replication_strategy: ReplicationStrategy, + placement_strategy: Option, default_provider: ProviderConfiguration, response_tx: oneshot::Sender>, }, @@ -237,7 +235,7 @@ impl ClusterControllerHandle { pub async fn update_cluster_configuration( &self, - replication_strategy: ReplicationStrategy, + placement_strategy: Option, default_provider: ProviderConfiguration, ) -> Result, ShutdownError> { let (response_tx, response_rx) = oneshot::channel(); @@ -245,7 +243,7 @@ impl ClusterControllerHandle { let _ = self .tx .send(ClusterControllerCommand::UpdateClusterConfiguration { - replication_strategy, + placement_strategy, default_provider, response_tx, }) @@ -389,7 +387,7 @@ impl Service { async fn update_cluster_configuration( &self, - replication_strategy: ReplicationStrategy, + placement_strategy: Option, default_provider: ProviderConfiguration, ) -> anyhow::Result<()> { let logs = self @@ -446,8 +444,8 @@ impl Service { let mut builder: PartitionTableBuilder = partition_table.into(); - if builder.replication_strategy() != replication_strategy { - builder.set_replication_strategy(replication_strategy); + if builder.placement_strategy() != &placement_strategy { + builder.set_placement_strategy(placement_strategy.clone()); } builder @@ -522,7 +520,7 @@ impl Service { .await; } ClusterControllerCommand::UpdateClusterConfiguration { - replication_strategy, + placement_strategy: replication_strategy, default_provider, response_tx, } => { diff --git a/crates/core/protobuf/node_ctl_svc.proto b/crates/core/protobuf/node_ctl_svc.proto index 0467e068b..d5e64dde4 100644 --- a/crates/core/protobuf/node_ctl_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -22,7 +22,8 @@ service NodeCtlSvc { rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse); // Provision the Restate cluster on this node. - rpc ProvisionCluster(ProvisionClusterRequest) returns (ProvisionClusterResponse); + rpc ProvisionCluster(ProvisionClusterRequest) + returns (ProvisionClusterResponse); } message ProvisionClusterRequest { @@ -30,9 +31,9 @@ message ProvisionClusterRequest { // if unset then the configured cluster num partitions will be used optional uint32 num_partitions = 2; // if unset then the configured cluster placement strategy will be used - optional restate.cluster.ReplicationStrategy placement_strategy = 3; + optional restate.cluster.ReplicationProperty partition_placement_strategy = 3; // if unset then the configured cluster default log provider will be used - optional restate.cluster.DefaultProvider log_provider = 4; + optional restate.cluster.BifrostProvider log_provider = 4; } message ProvisionClusterResponse { @@ -60,8 +61,8 @@ message IdentResponse { } message GetMetadataRequest { - // If set, we'll first sync with metadata store to esnure we are returning the latest value. - // Otherwise, we'll return the local value on this node. + // If set, we'll first sync with metadata store to esnure we are returning the + // latest value. Otherwise, we'll return the local value on this node. bool sync = 1; restate.common.MetadataKind kind = 2; } diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index 647cab3f4..62961dd56 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -18,7 +18,7 @@ use restate_core::network::net_util::create_tonic_channel; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest as ProtoProvisionClusterRequest; use restate_types::logs::metadata::ProviderConfiguration; -use restate_types::partition_table::ReplicationStrategy; +use restate_types::protobuf::cluster::ReplicationProperty; use restate_types::retries::RetryPolicy; use restate_types::{ config::{Configuration, MetadataStoreClient}, @@ -759,7 +759,7 @@ impl StartedNode { pub async fn provision_cluster( &self, num_partitions: Option, - placement_strategy: Option, + partition_placement_strategy: Option, log_provider: Option, ) -> anyhow::Result { let channel = create_tonic_channel( @@ -770,7 +770,7 @@ impl StartedNode { let request = ProtoProvisionClusterRequest { dry_run: false, num_partitions: num_partitions.map(|num| u32::from(num.get())), - placement_strategy: placement_strategy.map(Into::into), + partition_placement_strategy: partition_placement_strategy.map(Into::into), log_provider: log_provider.map(|log_provider| log_provider.into()), }; diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 4a89f99e0..e6065a393 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -16,6 +16,7 @@ mod roles; use anyhow::Context; use bytestring::ByteString; use prost_dto::IntoProst; +use restate_types::replicated_loglet::ReplicationProperty; use std::num::NonZeroU16; use tracing::{debug, error, info, trace, warn}; @@ -47,7 +48,7 @@ use restate_types::logs::metadata::{Logs, LogsConfiguration, ProviderConfigurati use restate_types::logs::RecordCache; use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY}; use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; -use restate_types::partition_table::{PartitionTable, PartitionTableBuilder, ReplicationStrategy}; +use restate_types::partition_table::{PartitionTable, PartitionTableBuilder}; use restate_types::protobuf::common::{ AdminStatus, IngressStatus, LogServerStatus, MetadataServerStatus, NodeRpcStatus, NodeStatus, WorkerStatus, @@ -544,10 +545,9 @@ impl Node { pub struct ClusterConfiguration { #[into_prost(map = "num_partitions_to_u32")] pub num_partitions: NonZeroU16, + pub partition_placement_strategy: Option, #[prost(required)] - pub replication_strategy: ReplicationStrategy, - #[prost(required)] - pub default_provider: ProviderConfiguration, + pub bifrost_provider: ProviderConfiguration, } fn num_partitions_to_u32(num_partitions: NonZeroU16) -> u32 { @@ -558,8 +558,8 @@ impl ClusterConfiguration { pub fn from_configuration(configuration: &Configuration) -> Self { ClusterConfiguration { num_partitions: configuration.common.bootstrap_num_partitions, - replication_strategy: ReplicationStrategy::default(), - default_provider: ProviderConfiguration::from_configuration(configuration), + partition_placement_strategy: None, + bifrost_provider: ProviderConfiguration::from_configuration(configuration), } } } @@ -633,11 +633,11 @@ fn generate_initial_metadata( .with_equally_sized_partitions(cluster_configuration.num_partitions.get()) .expect("Empty partition table should not have conflicts"); initial_partition_table_builder - .set_replication_strategy(cluster_configuration.replication_strategy); + .set_placement_strategy(cluster_configuration.partition_placement_strategy.clone()); let initial_partition_table = initial_partition_table_builder.build(); let initial_logs = Logs::with_logs_configuration(LogsConfiguration::from( - cluster_configuration.default_provider.clone(), + cluster_configuration.bifrost_provider.clone(), )); let initial_nodes_configuration = create_initial_nodes_configuration(common_opts); diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 05aa64046..6c5ecc944 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -14,6 +14,7 @@ use anyhow::Context; use bytes::BytesMut; use enumset::EnumSet; use futures::stream::BoxStream; +use restate_types::replicated_loglet::ReplicationProperty; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; @@ -31,7 +32,6 @@ use restate_types::config::Configuration; use restate_types::health::Health; use restate_types::logs::metadata::ProviderConfiguration; use restate_types::nodes_config::Role; -use restate_types::partition_table::ReplicationStrategy; use restate_types::protobuf::cluster::ClusterConfiguration as ProtoClusterConfiguration; use restate_types::protobuf::node::Message; use restate_types::storage::StorageCodec; @@ -79,20 +79,19 @@ impl NodeCtlSvcHandler { }) .transpose()? .unwrap_or(config.common.bootstrap_num_partitions); - let replication_strategy = request - .placement_strategy - .map(ReplicationStrategy::try_from) - .transpose()? - .unwrap_or_default(); - let default_provider = request + let partition_placement_strategy = request + .partition_placement_strategy + .map(ReplicationProperty::try_from) + .transpose()?; + let bifrost_provider = request .log_provider .map(ProviderConfiguration::try_from) .unwrap_or_else(|| Ok(ProviderConfiguration::from_configuration(config)))?; Ok(ClusterConfiguration { num_partitions, - replication_strategy, - default_provider, + partition_placement_strategy, + bifrost_provider, }) } } diff --git a/crates/types/protobuf/restate/cluster.proto b/crates/types/protobuf/restate/cluster.proto index e94f558e7..51893e2be 100644 --- a/crates/types/protobuf/restate/cluster.proto +++ b/crates/types/protobuf/restate/cluster.proto @@ -77,28 +77,16 @@ message PartitionProcessorStatus { optional restate.common.Lsn target_tail_lsn = 11; } -message ReplicatedProviderConfig { string replication_property = 1; } +message ReplicationProperty { string replication_property = 1; } -message DefaultProvider { +message BifrostProvider { string provider = 1; // only required if provider = "replicated" - optional ReplicatedProviderConfig replicated_config = 2; -} - -enum ReplicationStrategyKind { - ReplicationStrategyKind_UNKNOWN = 0; - OnAllNodes = 1; - Factor = 2; -} - -message ReplicationStrategy { - ReplicationStrategyKind kind = 1; - // required if kind == "Factor" - optional uint32 factor = 2; + optional ReplicationProperty replication_property = 2; } message ClusterConfiguration { uint32 num_partitions = 1; - ReplicationStrategy replication_strategy = 2; - DefaultProvider default_provider = 3; + optional ReplicationProperty partition_placement_strategy = 2; + BifrostProvider bifrost_provider = 3; } diff --git a/crates/types/src/config/admin.rs b/crates/types/src/config/admin.rs index 055bf8aad..4271e03ae 100644 --- a/crates/types/src/config/admin.rs +++ b/crates/types/src/config/admin.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::partition_table::ReplicationStrategy; - use super::QueryEngineOptions; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -68,12 +66,6 @@ pub struct AdminOptions { #[cfg_attr(feature = "schemars", schemars(with = "String"))] pub log_tail_update_interval: humantime::Duration, - /// # Default replication strategy - /// - /// The default replication strategy to be used by the cluster controller to schedule partition - /// processors. - pub default_replication_strategy: ReplicationStrategy, - #[cfg(any(test, feature = "test-util"))] pub disable_cluster_controller: bool, } @@ -111,7 +103,6 @@ impl Default for AdminOptions { // try to trim the log every hour log_trim_interval: Some(Duration::from_secs(60 * 60).into()), log_trim_threshold: 1000, - default_replication_strategy: ReplicationStrategy::OnAllNodes, #[cfg(any(test, feature = "test-util"))] disable_cluster_controller: false, log_tail_update_interval: Duration::from_secs(5 * 60).into(), diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 0da479ea5..d3f5aae99 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -150,11 +150,11 @@ impl ProviderConfiguration { } } -impl From for crate::protobuf::cluster::DefaultProvider { +impl From for crate::protobuf::cluster::BifrostProvider { fn from(value: ProviderConfiguration) -> Self { use crate::protobuf::cluster; - let mut result = crate::protobuf::cluster::DefaultProvider::default(); + let mut result = crate::protobuf::cluster::BifrostProvider::default(); match value { ProviderConfiguration::Local => result.provider = ProviderKind::Local.to_string(), @@ -162,7 +162,7 @@ impl From for crate::protobuf::cluster::DefaultProvider { ProviderConfiguration::InMemory => result.provider = ProviderKind::InMemory.to_string(), ProviderConfiguration::Replicated(config) => { result.provider = ProviderKind::Replicated.to_string(); - result.replicated_config = Some(cluster::ReplicatedProviderConfig { + result.replication_property = Some(cluster::ReplicationProperty { replication_property: config.replication_property.to_string(), }) } @@ -172,9 +172,9 @@ impl From for crate::protobuf::cluster::DefaultProvider { } } -impl TryFrom for ProviderConfiguration { +impl TryFrom for ProviderConfiguration { type Error = anyhow::Error; - fn try_from(value: crate::protobuf::cluster::DefaultProvider) -> Result { + fn try_from(value: crate::protobuf::cluster::BifrostProvider) -> Result { let provider_kind: ProviderKind = value.provider.parse()?; match provider_kind { @@ -182,7 +182,7 @@ impl TryFrom for ProviderConfiguratio #[cfg(any(test, feature = "memory-loglet"))] ProviderKind::InMemory => Ok(Self::InMemory), ProviderKind::Replicated => { - let config = value.replicated_config.ok_or_else(|| { + let config = value.replication_property.ok_or_else(|| { anyhow::anyhow!("replicate_config is required with replicated provider") })?; diff --git a/crates/types/src/partition_table.rs b/crates/types/src/partition_table.rs index b8371d0cb..6bb7a25ed 100644 --- a/crates/types/src/partition_table.rs +++ b/crates/types/src/partition_table.rs @@ -9,28 +9,19 @@ // by the Apache License, Version 2.0. use std::collections::BTreeMap; -use std::fmt::Display; use std::hash::{Hash, Hasher}; -use std::num::{NonZero, NonZeroU32}; use std::ops::RangeInclusive; -use std::str::FromStr; -use std::sync::LazyLock; -use anyhow::Context; use indexmap::IndexSet; -use regex::Regex; +use serde_with::{serde_as, DisplayFromStr}; use xxhash_rust::xxh3::{self, Xxh3Builder}; use crate::cluster::cluster_state::RunMode; use crate::identifiers::{PartitionId, PartitionKey}; use crate::logs::LogId; -use crate::protobuf::cluster::ReplicationStrategy as ProtoReplicationStrategy; +use crate::replicated_loglet::ReplicationProperty; use crate::{flexbuffers_storage_encode_decode, PlainNodeId, Version, Versioned}; -static REPLICATION_STRATEGY_FACTOR_PATTERN: LazyLock = LazyLock::new(|| { - Regex::new(r"^(?i)factor\(\s*(?\d+)\s*\)$").expect("is valid pattern") -}); - const DB_NAME: &str = "db"; const PARTITION_CF_PREFIX: &str = "data-"; @@ -68,7 +59,7 @@ pub struct PartitionTable { // are not visible from this index structure. partition_key_index: BTreeMap, - replication_strategy: ReplicationStrategy, + placement_strategy: Option, } impl Default for PartitionTable { @@ -77,7 +68,7 @@ impl Default for PartitionTable { version: Version::INVALID, partitions: BTreeMap::default(), partition_key_index: BTreeMap::default(), - replication_strategy: ReplicationStrategy::default(), + placement_strategy: None, } } } @@ -136,8 +127,8 @@ impl PartitionTable { self.partitions.contains_key(partition_id) } - pub fn replication_strategy(&self) -> ReplicationStrategy { - self.replication_strategy + pub fn placement_strategy(&self) -> &Option { + &self.placement_strategy } pub fn into_builder(self) -> PartitionTableBuilder { @@ -368,15 +359,15 @@ impl PartitionTableBuilder { self.inner.num_partitions() } - pub fn set_replication_strategy(&mut self, replication_strategy: ReplicationStrategy) { - if self.inner.replication_strategy != replication_strategy { - self.inner.replication_strategy = replication_strategy; + pub fn set_placement_strategy(&mut self, placement_strategy: Option) { + if self.inner.placement_strategy != placement_strategy { + self.inner.placement_strategy = placement_strategy; self.modified = true; } } - pub fn replication_strategy(&self) -> ReplicationStrategy { - self.inner.replication_strategy + pub fn placement_strategy(&self) -> &Option { + &self.inner.placement_strategy } /// Adds a new partition to the partition table. The newly added partition must exist and must @@ -483,7 +474,7 @@ pub struct PartitionShadow { /// Serialization helper which handles the deserialization of the current and older /// [`PartitionTable`] versions. -#[serde_with::serde_as] +#[serde_as] #[derive(serde::Serialize, serde::Deserialize)] struct PartitionTableShadow { version: Version, @@ -494,7 +485,9 @@ struct PartitionTableShadow { // flexbuffers only supports string-keyed maps :-( --> so we store it as vector of kv pairs #[serde_as(as = "Option>")] partitions: Option>, - replication_strategy: Option, + + #[serde_as(as = "Option")] + placement_strategy: Option, } impl From for PartitionTableShadow { @@ -520,7 +513,7 @@ impl From for PartitionTableShadow { }) .collect(), ), - replication_strategy: Some(value.replication_strategy), + placement_strategy: value.placement_strategy, } } } @@ -531,7 +524,7 @@ impl TryFrom for PartitionTable { fn try_from(value: PartitionTableShadow) -> Result { let mut builder = PartitionTableBuilder::new(value.version); // replication strategy is unset if data has been written with version <= v1.1.3 - builder.set_replication_strategy(value.replication_strategy.unwrap_or_default()); + builder.set_placement_strategy(value.placement_strategy); match value.partitions { Some(partitions) => { @@ -621,102 +614,12 @@ impl Iterator for EqualSizedPartitionPartitioner { } } -/// Replication strategy for partition processors. -#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] -#[serde(rename_all = "kebab-case")] -pub enum ReplicationStrategy { - /// Schedule partition processor replicas on all available nodes - #[default] - OnAllNodes, - /// Schedule this number of partition processor replicas - Factor(NonZero), -} - -impl TryFrom for ReplicationStrategy { - type Error = anyhow::Error; - - fn try_from(value: ProtoReplicationStrategy) -> Result { - use crate::protobuf::cluster::ReplicationStrategyKind; - - if value.kind == i32::from(ReplicationStrategyKind::OnAllNodes) { - Ok(Self::OnAllNodes) - } else if value.kind == i32::from(ReplicationStrategyKind::Factor) { - let factor = value.factor.ok_or_else(|| { - anyhow::anyhow!("factor is require with Factor replication strategy") - })?; - - let factor = - NonZeroU32::new(factor).context("Replication strategy factor must be non zero")?; - Ok(Self::Factor(factor)) - } else { - anyhow::bail!("Unknown replication strategy") - } - } -} - -impl Display for ReplicationStrategy { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::OnAllNodes => { - write!(f, "on-all-nodes") - } - Self::Factor(factor) => { - write!(f, "factor({})", factor) - } - } - } -} - -impl FromStr for ReplicationStrategy { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - match s.to_ascii_lowercase().as_str() { - "on-all-nodes" => Ok(Self::OnAllNodes), - "factor" => anyhow::bail!("Missing replication factor value. Should be 'factor()'."), - s => { - let Some(m) = REPLICATION_STRATEGY_FACTOR_PATTERN.captures(s) else { - anyhow::bail!("Unknown replication strategy '{}'", s); - }; - - let factor: NonZeroU32 = m["factor"] - .parse() - .context("Invalid replication strategy factor")?; - - Ok(Self::Factor(factor)) - } - } - } -} - -impl From for ProtoReplicationStrategy { - fn from(value: ReplicationStrategy) -> Self { - use crate::protobuf::cluster::ReplicationStrategyKind; - - let mut result = Self::default(); - match value { - ReplicationStrategy::OnAllNodes => { - result.kind = ReplicationStrategyKind::OnAllNodes.into() - } - ReplicationStrategy::Factor(factor) => { - result.kind = ReplicationStrategyKind::Factor.into(); - result.factor = Some(factor.get()); - } - }; - - result - } -} - #[cfg(test)] mod tests { - use std::num::NonZeroU32; - use bytes::BytesMut; use test_log::test; - use super::{PartitionPlacement, ReplicationStrategy}; + use super::PartitionPlacement; use crate::identifiers::{PartitionId, PartitionKey}; use crate::partition_table::{ EqualSizedPartitionPartitioner, FindPartition, Partition, PartitionTable, @@ -725,20 +628,6 @@ mod tests { use crate::storage::StorageCodec; use crate::{flexbuffers_storage_encode_decode, PlainNodeId, Version}; - #[test] - fn test_replication_strategy_parse() { - let strategy: ReplicationStrategy = "on-all-nodes".parse().unwrap(); - assert_eq!(ReplicationStrategy::OnAllNodes, strategy); - - let strategy: ReplicationStrategy = "factor(10)".parse().unwrap(); - assert_eq!( - ReplicationStrategy::Factor(NonZeroU32::new(10).expect("is non zero")), - strategy - ); - - let strategy: anyhow::Result = "factor(0)".parse(); - assert!(strategy.is_err()); - } #[test] fn partitioner_produces_consecutive_ranges() { let partitioner = EqualSizedPartitionPartitioner::new(10); diff --git a/crates/types/src/protobuf.rs b/crates/types/src/protobuf.rs index 1be1d15a9..45d8ba801 100644 --- a/crates/types/src/protobuf.rs +++ b/crates/types/src/protobuf.rs @@ -46,6 +46,7 @@ pub mod common { } pub mod cluster { + include!(concat!(env!("OUT_DIR"), "/restate.cluster.rs")); impl std::fmt::Display for RunMode { @@ -58,6 +59,22 @@ pub mod cluster { write!(f, "{o}") } } + + impl From for ReplicationProperty { + fn from(value: crate::replicated_loglet::ReplicationProperty) -> Self { + ReplicationProperty { + replication_property: value.to_string(), + } + } + } + + impl TryFrom for crate::replicated_loglet::ReplicationProperty { + type Error = anyhow::Error; + + fn try_from(value: ReplicationProperty) -> Result { + value.replication_property.parse() + } + } } pub mod node { diff --git a/tools/restatectl/src/commands/cluster/config.rs b/tools/restatectl/src/commands/cluster/config.rs index ee941dfa9..6adeff6db 100644 --- a/tools/restatectl/src/commands/cluster/config.rs +++ b/tools/restatectl/src/commands/cluster/config.rs @@ -16,8 +16,7 @@ use std::fmt::{self, Display, Write}; use cling::prelude::*; use restate_types::{ - logs::metadata::ProviderConfiguration, partition_table::ReplicationStrategy, - protobuf::cluster::ClusterConfiguration, + logs::metadata::ProviderConfiguration, protobuf::cluster::ClusterConfiguration, }; #[derive(Run, Subcommand, Clone)] @@ -38,13 +37,16 @@ pub fn cluster_config_string(config: &ClusterConfiguration) -> anyhow::Result, + partition_placement_strategy: Option, - /// Default provider kind + /// Default provider kind. #[clap(long)] bifrost_provider: Option, - /// Replication property + /// Replication property for `replicated` bifrost provider. #[clap(long, required_if_eq("bifrost_provider", "replicated"))] replication_property: Option, } @@ -60,9 +60,10 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an let current_config_string = cluster_config_string(¤t)?; - if let Some(replication_strategy) = set_opts.replication_strategy { - current.replication_strategy = Some(replication_strategy.into()); - } + current.partition_placement_strategy = set_opts + .partition_placement_strategy + .clone() + .map(Into::into); if let Some(provider) = set_opts.bifrost_provider { let default_provider = @@ -77,7 +78,7 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an } } - current.default_provider = Some(default_provider.into()); + current.bifrost_provider = Some(default_provider.into()); } let updated_config_string = cluster_config_string(¤t)?; diff --git a/tools/restatectl/src/commands/cluster/provision.rs b/tools/restatectl/src/commands/cluster/provision.rs index 65669d213..c869c6a13 100644 --- a/tools/restatectl/src/commands/cluster/provision.rs +++ b/tools/restatectl/src/commands/cluster/provision.rs @@ -19,7 +19,6 @@ use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest; use restate_types::logs::metadata::{ProviderConfiguration, ProviderKind, ReplicatedLogletConfig}; use restate_types::net::AdvertisedAddress; -use restate_types::partition_table::ReplicationStrategy; use restate_types::replicated_loglet::ReplicationProperty; use std::num::NonZeroU16; use tonic::codec::CompressionEncoding; @@ -36,10 +35,11 @@ pub struct ProvisionOpts { #[clap(long)] num_partitions: Option, - /// Replication strategy. Possible values - /// are `on-all-nodes` or `factor(n)` + /// Optional partition placement strategy. By default replicates + /// partitions on all nodes. Accepts replication property + /// string as a value #[clap(long)] - replication_strategy: Option, + partition_placement_strategy: Option, /// Default log provider kind #[clap(long)] @@ -72,7 +72,10 @@ async fn cluster_provision( let request = ProvisionClusterRequest { dry_run: true, num_partitions: provision_opts.num_partitions.map(|n| u32::from(n.get())), - placement_strategy: provision_opts.replication_strategy.map(Into::into), + partition_placement_strategy: provision_opts + .partition_placement_strategy + .clone() + .map(Into::into), log_provider: log_provider.map(Into::into), }; @@ -97,7 +100,7 @@ async fn cluster_provision( cluster_config_string(&cluster_configuration_to_provision)? ); - if let Some(default_provider) = &cluster_configuration_to_provision.default_provider { + if let Some(default_provider) = &cluster_configuration_to_provision.bifrost_provider { let default_provider = ProviderConfiguration::try_from(default_provider.clone())?; match default_provider { @@ -115,8 +118,9 @@ async fn cluster_provision( let request = ProvisionClusterRequest { dry_run: false, num_partitions: Some(cluster_configuration_to_provision.num_partitions), - placement_strategy: cluster_configuration_to_provision.replication_strategy, - log_provider: cluster_configuration_to_provision.default_provider, + partition_placement_strategy: cluster_configuration_to_provision + .partition_placement_strategy, + log_provider: cluster_configuration_to_provision.bifrost_provider, }; match client.provision_cluster(request).await {