diff --git a/Cargo.lock b/Cargo.lock index 7c7827a4e..ae59dc255 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5333,6 +5333,7 @@ name = "restate-cluster-controller" version = "0.9.1" dependencies = [ "anyhow", + "arc-swap", "codederror", "derive_builder", "drain", @@ -6312,6 +6313,7 @@ dependencies = [ "opentelemetry", "pin-project", "prost", + "rand", "restate-bifrost", "restate-core", "restate-errors", diff --git a/crates/cluster-controller/Cargo.toml b/crates/cluster-controller/Cargo.toml index 2bbac7cb5..35712db5e 100644 --- a/crates/cluster-controller/Cargo.toml +++ b/crates/cluster-controller/Cargo.toml @@ -19,6 +19,7 @@ restate-node-protocol = { workspace = true } restate-types = { workspace = true } anyhow = { workspace = true } +arc-swap = { workspace = true } codederror = { workspace = true } derive_builder = { workspace = true } drain = { workspace = true } diff --git a/crates/cluster-controller/src/cluster_state.rs b/crates/cluster-controller/src/cluster_state.rs new file mode 100644 index 000000000..16aed96ed --- /dev/null +++ b/crates/cluster-controller/src/cluster_state.rs @@ -0,0 +1,230 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use arc_swap::ArcSwap; +use restate_core::network::MessageRouterBuilder; +use restate_network::rpc_router::RpcRouter; +use restate_node_protocol::partition_processor_manager::GetProcessorsState; +use restate_types::identifiers::PartitionId; +use restate_types::nodes_config::Role; +use restate_types::processors::PartitionProcessorStatus; +use restate_types::time::MillisSinceEpoch; +use tokio::task::JoinHandle; +use tokio::time::Instant; + +use restate_core::{Metadata, ShutdownError, TaskCenter}; +use restate_network::Networking; +use restate_types::{GenerationalNodeId, PlainNodeId, Version}; + +/// A container for health information about every node and partition in the +/// cluster. +#[derive(Debug, Clone)] +pub struct ClusterState { + pub last_refreshed: Option, + pub nodes_config_version: Version, + pub partition_table_version: Version, + pub nodes: BTreeMap, +} + +impl ClusterState { + pub fn is_reliable(&self) -> bool { + // todo: make this configurable + // If the cluster state is older than 10 seconds, then it is not reliable. + self.last_refreshed + .map(|last_refreshed| last_refreshed.elapsed().as_secs() < 10) + .unwrap_or(false) + } +} + +#[derive(Debug, Clone)] +pub enum NodeState { + Alive { + last_heartbeat_at: MillisSinceEpoch, + generation: GenerationalNodeId, + partitions: BTreeMap, + }, + Dead { + last_seen_alive: Option, + }, +} + +pub struct ClusterStateRefresher { + task_center: TaskCenter, + metadata: Metadata, + get_state_router: RpcRouter, + updateable_cluster_state: Arc>, + in_flight_refresh: Option>, +} + +impl ClusterStateRefresher { + pub fn new( + task_center: TaskCenter, + metadata: Metadata, + networking: Networking, + router_builder: &mut MessageRouterBuilder, + ) -> Self { + let get_state_router = RpcRouter::new(networking.clone(), router_builder); + + let initial_state = ClusterState { + last_refreshed: None, + nodes_config_version: Version::INVALID, + partition_table_version: Version::INVALID, + nodes: BTreeMap::new(), + }; + let updateable_cluster_state = Arc::new(ArcSwap::from_pointee(initial_state)); + + Self { + task_center, + metadata, + get_state_router, + updateable_cluster_state, + in_flight_refresh: None, + } + } + + pub fn get_cluster_state(&self) -> Arc { + self.updateable_cluster_state.load_full() + } + + pub fn schedule_refresh(&mut self) -> Result<(), ShutdownError> { + // if in-flight refresh is happening, then ignore. + if let Some(handle) = &self.in_flight_refresh { + if handle.is_finished() { + self.in_flight_refresh = None; + } else { + // still in flight. + return Ok(()); + } + } + + self.in_flight_refresh = Self::start_refresh_task( + self.task_center.clone(), + self.get_state_router.clone(), + self.updateable_cluster_state.clone(), + self.metadata.clone(), + )?; + + Ok(()) + } + + fn start_refresh_task( + tc: TaskCenter, + get_state_router: RpcRouter, + updateable_cluster_state: Arc>, + metadata: Metadata, + ) -> Result>, ShutdownError> { + let task_center = tc.clone(); + let refresh = async move { + let last_state = updateable_cluster_state.load(); + // make sure we have a partition table that equals or newer than last refresh + let partition_table = metadata + .wait_for_partition_table(last_state.partition_table_version) + .await?; + let _ = metadata + .wait_for_version( + restate_core::MetadataKind::NodesConfiguration, + last_state.nodes_config_version, + ) + .await; + let nodes_config = metadata.nodes_config(); + + let mut nodes = BTreeMap::new(); + let mut join_set = tokio::task::JoinSet::new(); + for (node_id, node) in nodes_config.iter() { + // We are only interested in worker nodes. + if !node.has_role(Role::Worker) { + continue; + } + + let rpc_router = get_state_router.clone(); + let tc = tc.clone(); + join_set + .build_task() + .name("get-processors-state") + .spawn(async move { + tc.run_in_scope("get-processor-state", None, async move { + ( + node_id, + tokio::time::timeout( + // todo: make configurable + std::time::Duration::from_secs(1), + rpc_router.call(node_id.into(), &GetProcessorsState::default()), + ) + .await, + ) + }) + .await + }) + .expect("to spawn task"); + } + while let Some(Ok((node_id, res))) = join_set.join_next().await { + // Did the node timeout? consider it dead + // Important note: This is a naive mechanism for failure detection, this should be + // considered a temporary design until we get to build a more robust detection that + // accounts for flaky or bouncy nodes, but we assume that the timeout is large + // enough that a single timeout signifies a dead node. + // + // The node gets the same treatment on other RpcErrors. + let Ok(Ok(res)) = res else { + // determine last seen alive. + let last_seen_alive = + last_state + .nodes + .get(&node_id) + .and_then(|state| match state { + NodeState::Alive { + last_heartbeat_at, .. + } => Some(*last_heartbeat_at), + NodeState::Dead { last_seen_alive } => *last_seen_alive, + }); + + nodes.insert(node_id, NodeState::Dead { last_seen_alive }); + continue; + }; + + let (from, msg) = res.split(); + + nodes.insert( + node_id, + NodeState::Alive { + last_heartbeat_at: MillisSinceEpoch::now(), + generation: from, + partitions: msg.state, + }, + ); + } + + let state = ClusterState { + last_refreshed: Some(Instant::now()), + nodes_config_version: nodes_config.version(), + partition_table_version: partition_table.version(), + nodes, + }; + + // publish the new state + updateable_cluster_state.store(Arc::new(state)); + Ok(()) + }; + + let task_id = task_center.spawn( + restate_core::TaskKind::Disposable, + "cluster-state-refresh", + None, + refresh, + )?; + + // If this returned None, it means that the task completed or has been + // cancelled before we get to this point. + Ok(task_center.take_task(task_id)) + } +} diff --git a/crates/cluster-controller/src/lib.rs b/crates/cluster-controller/src/lib.rs index ac9ac141f..dff387bb3 100644 --- a/crates/cluster-controller/src/lib.rs +++ b/crates/cluster-controller/src/lib.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod cluster_state; mod service; pub use service::{ClusterControllerHandle, Error, Service}; diff --git a/crates/cluster-controller/src/service.rs b/crates/cluster-controller/src/service.rs index c4b2a9ece..c5602b35f 100644 --- a/crates/cluster-controller/src/service.rs +++ b/crates/cluster-controller/src/service.rs @@ -8,21 +8,31 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::sync::Arc; + use codederror::CodedError; use futures::stream::BoxStream; use futures::StreamExt; +use tokio::time::Instant; use restate_network::Networking; use restate_node_protocol::cluster_controller::{ - Action, AttachRequest, AttachResponse, RunMode, RunPartition, + Action, AttachRequest, AttachResponse, RunPartition, }; use restate_node_protocol::common::{KeyRange, RequestId}; +use restate_types::arc_util::Updateable; +use restate_types::config::AdminOptions; use restate_types::partition_table::FixedPartitionTable; use restate_core::network::{MessageRouterBuilder, NetworkSender}; -use restate_core::{cancellation_watcher, task_center, Metadata, ShutdownError, TaskCenter}; +use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskCenter}; use restate_node_protocol::MessageEnvelope; +use restate_types::processors::RunMode; use restate_types::{GenerationalNodeId, Version}; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::MissedTickBehavior; + +use crate::cluster_state::{ClusterState, ClusterStateRefresher}; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -32,44 +42,94 @@ pub enum Error { } pub struct Service { + task_center: TaskCenter, metadata: Metadata, networking: Networking, incoming_messages: BoxStream<'static, MessageEnvelope>, + cluster_state_refresher: ClusterStateRefresher, + command_tx: mpsc::Sender, + command_rx: mpsc::Receiver, } impl Service { pub fn new( + task_center: TaskCenter, metadata: Metadata, networking: Networking, router_builder: &mut MessageRouterBuilder, ) -> Self { let incoming_messages = router_builder.subscribe_to_stream(10); + let (command_tx, command_rx) = mpsc::channel(2); + + let cluster_state_refresher = ClusterStateRefresher::new( + task_center.clone(), + metadata.clone(), + networking.clone(), + router_builder, + ); + Service { + task_center, metadata, networking, incoming_messages, + cluster_state_refresher, + command_tx, + command_rx, } } } -pub struct ClusterControllerHandle; +enum ClusterControllerCommand { + GetClusterState(oneshot::Sender>), +} + +pub struct ClusterControllerHandle { + tx: mpsc::Sender, +} + +impl ClusterControllerHandle { + pub async fn get_cluster_state(&self) -> Result, ShutdownError> { + let (tx, rx) = oneshot::channel(); + // ignore the error, we own both tx and rx at this point. + let _ = self + .tx + .send(ClusterControllerCommand::GetClusterState(tx)) + .await; + rx.await.map_err(|_| ShutdownError) + } +} impl Service { pub fn handle(&self) -> ClusterControllerHandle { - ClusterControllerHandle + ClusterControllerHandle { + tx: self.command_tx.clone(), + } } - pub async fn run(mut self) -> anyhow::Result<()> { + pub async fn run( + mut self, + mut updateable_config: impl Updateable, + ) -> anyhow::Result<()> { + let options = updateable_config.load(); // Make sure we have partition table before starting let _ = self.metadata.wait_for_partition_table(Version::MIN).await?; - + let mut heartbeat = + tokio::time::interval_at(Instant::now(), options.heartbeat_interval.into()); + heartbeat.set_missed_tick_behavior(MissedTickBehavior::Delay); let mut shutdown = std::pin::pin!(cancellation_watcher()); - let tc = task_center(); loop { tokio::select! { + _ = heartbeat.tick() => { + // Ignore error if system is shutting down + let _ = self.cluster_state_refresher.schedule_refresh(); + } + Some(cmd) = self.command_rx.recv() => { + self.on_get_cluster_state(cmd); + } Some(message) = self.incoming_messages.next() => { let (from, message) = message.split(); - self.handle_attach_request(&tc, from, message)?; + self.on_attach_request(from, message)?; } _ = &mut shutdown => { return Ok(()); @@ -78,9 +138,16 @@ impl Service { } } - fn handle_attach_request( + fn on_get_cluster_state(&self, command: ClusterControllerCommand) { + match command { + ClusterControllerCommand::GetClusterState(tx) => { + let _ = tx.send(self.cluster_state_refresher.get_cluster_state()); + } + } + } + + fn on_attach_request( &mut self, - tc: &TaskCenter, from: GenerationalNodeId, request: AttachRequest, ) -> Result<(), ShutdownError> { @@ -90,7 +157,7 @@ impl Service { .expect("partition table is loaded before run"); let networking = self.networking.clone(); let response = self.create_attachment_response(&partition_table, from, request.request_id); - tc.spawn( + self.task_center.spawn( restate_core::TaskKind::Disposable, "attachment-response", None, diff --git a/crates/core/src/worker_api/partition_processor_manager.rs b/crates/core/src/worker_api/partition_processor_manager.rs index 1e1de711a..5486de0a4 100644 --- a/crates/core/src/worker_api/partition_processor_manager.rs +++ b/crates/core/src/worker_api/partition_processor_manager.rs @@ -8,14 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use restate_types::identifiers::PartitionId; use tokio::sync::{mpsc, oneshot}; -use restate_node_protocol::cluster_controller::RunMode; -use restate_types::identifiers::{LeaderEpoch, PartitionId}; -use restate_types::logs::Lsn; -use restate_types::time::MillisSinceEpoch; -use restate_types::GenerationalNodeId; - use crate::ShutdownError; #[derive(Debug)] @@ -23,42 +18,6 @@ pub enum ProcessorsManagerCommand { GetLivePartitions(oneshot::Sender>), } -#[derive(Debug, Clone, Eq, PartialEq)] -pub enum ReplayStatus { - Starting, - Active, - CatchingUp { target_tail_lsn: Lsn }, -} - -#[derive(Debug, Clone)] -pub struct PartitionProcessorStatus { - pub updated_at: MillisSinceEpoch, - pub planned_mode: RunMode, - pub effective_mode: Option, - pub last_observed_leader_epoch: Option, - pub last_observed_leader_node: Option, - pub last_applied_log_lsn: Option, - pub last_record_applied_at: Option, - pub skipped_records: u64, - pub replay_status: ReplayStatus, -} - -impl PartitionProcessorStatus { - pub fn new(planned_mode: RunMode) -> Self { - Self { - updated_at: MillisSinceEpoch::now(), - planned_mode, - effective_mode: None, - last_observed_leader_epoch: None, - last_observed_leader_node: None, - last_applied_log_lsn: None, - last_record_applied_at: None, - skipped_records: 0, - replay_status: ReplayStatus::Starting, - } - } -} - #[derive(Debug, Clone)] pub struct ProcessorsManagerHandle(mpsc::Sender); diff --git a/crates/network/src/connection_manager.rs b/crates/network/src/connection_manager.rs index 3a146fd62..985a0498c 100644 --- a/crates/network/src/connection_manager.rs +++ b/crates/network/src/connection_manager.rs @@ -448,12 +448,13 @@ where "task_id", tracing::field::display(current_task_id().unwrap()), ); + let mut cancellation = std::pin::pin!(cancellation_watcher()); // Receive loop loop { // read a message from the stream let msg = tokio::select! { biased; - _ = cancellation_watcher() => { + _ = &mut cancellation => { connection.send_control_frame(ConnectionControl::shutdown()); break; }, diff --git a/crates/network/src/rpc_router.rs b/crates/network/src/rpc_router.rs index 23e458bab..e7a78fb7d 100644 --- a/crates/network/src/rpc_router.rs +++ b/crates/network/src/rpc_router.rs @@ -32,6 +32,7 @@ use crate::Networking; /// tracking tokens if caller dropped the future. /// /// This type is designed to be used by senders of RpcRequest(s). +#[derive(Clone)] pub struct RpcRouter where T: RpcRequest, diff --git a/crates/node-protocol/proto/common.proto b/crates/node-protocol/proto/common.proto index 755ca2acc..f23bceb7f 100644 --- a/crates/node-protocol/proto/common.proto +++ b/crates/node-protocol/proto/common.proto @@ -34,5 +34,6 @@ enum TargetName { LOCAL_METADATA_STORE_CLIENT = 4; ATTACH_REQUEST = 5; ATTACH_RESPONSE = 6; - PARTITION_PROCESSOR_MANAGER_REQUESTS = 7; + GET_PROCESSORS_STATE_REQUEST = 7; + PROCESSORS_STATE_RESPONSE = 8; } diff --git a/crates/node-protocol/src/cluster_controller.rs b/crates/node-protocol/src/cluster_controller.rs index 6b70cc72f..4de6b1b8f 100644 --- a/crates/node-protocol/src/cluster_controller.rs +++ b/crates/node-protocol/src/cluster_controller.rs @@ -8,9 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_types::identifiers::PartitionId; use serde::{Deserialize, Serialize}; +use restate_types::identifiers::PartitionId; +use restate_types::processors::RunMode; + use crate::common::{KeyRange, RequestId, TargetName}; use crate::define_rpc; @@ -32,12 +34,6 @@ pub struct AttachResponse { pub actions: Vec, } -#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)] -pub enum RunMode { - Leader, - Follower, -} - #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Action { RunPartition(RunPartition), diff --git a/crates/node-protocol/src/lib.rs b/crates/node-protocol/src/lib.rs index 15e101406..6bb6787f6 100644 --- a/crates/node-protocol/src/lib.rs +++ b/crates/node-protocol/src/lib.rs @@ -15,6 +15,7 @@ mod error; pub mod ingress; pub mod metadata; pub mod node; +pub mod partition_processor_manager; // re-exports for convenience pub use common::CURRENT_PROTOCOL_VERSION; diff --git a/crates/node-protocol/src/partition_processor_manager.rs b/crates/node-protocol/src/partition_processor_manager.rs new file mode 100644 index 000000000..2b552c7a4 --- /dev/null +++ b/crates/node-protocol/src/partition_processor_manager.rs @@ -0,0 +1,39 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; + +use restate_types::identifiers::PartitionId; +use restate_types::processors::PartitionProcessorStatus; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; + +use crate::common::{RequestId, TargetName}; +use crate::define_rpc; + +define_rpc! { + @request = GetProcessorsState, + @response = ProcessorsStateResponse, + @request_target = TargetName::GetProcessorsStateRequest, + @response_target = TargetName::ProcessorsStateResponse, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct GetProcessorsState { + pub request_id: RequestId, +} + +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProcessorsStateResponse { + pub request_id: RequestId, + #[serde_as(as = "serde_with::Seq<(_, _)>")] + pub state: BTreeMap, +} diff --git a/crates/node-services/proto/node_svc.proto b/crates/node-services/proto/node_svc.proto index 5c2c05d74..0300aa2d2 100644 --- a/crates/node-services/proto/node_svc.proto +++ b/crates/node-services/proto/node_svc.proto @@ -46,4 +46,4 @@ message StorageQueryRequest { string query = 1; } message StorageQueryResponse { bytes header = 1; bytes data = 2; -} \ No newline at end of file +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 1e493dc23..a36072544 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -144,6 +144,7 @@ impl Node { let admin_role = if config.has_role(Role::Admin) { Some(AdminRole::new( + task_center(), updateable_config.clone(), metadata.clone(), networking.clone(), diff --git a/crates/node/src/network_server/handler/cluster_ctrl.rs b/crates/node/src/network_server/handler/cluster_ctrl.rs index 748c6d8ab..8453f2779 100644 --- a/crates/node/src/network_server/handler/cluster_ctrl.rs +++ b/crates/node/src/network_server/handler/cluster_ctrl.rs @@ -8,16 +8,27 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use tonic::async_trait; +use tonic::{async_trait, Request, Response, Status}; +use tracing::info; +use restate_cluster_controller::ClusterControllerHandle; +use restate_metadata_store::MetadataStoreClient; use restate_node_services::cluster_ctrl::cluster_ctrl_svc_server::ClusterCtrlSvc; use restate_node_services::cluster_ctrl::{ClusterStateRequest, ClusterStateResponse}; -pub struct ClusterCtrlSvcHandler {} +use crate::network_server::AdminDependencies; + +pub struct ClusterCtrlSvcHandler { + _metadata_store_client: MetadataStoreClient, + controller_handle: ClusterControllerHandle, +} impl ClusterCtrlSvcHandler { - pub fn new() -> Self { - Self {} + pub fn new(admin_deps: AdminDependencies) -> Self { + Self { + controller_handle: admin_deps.cluster_controller_handle, + _metadata_store_client: admin_deps.metadata_store_client, + } } } @@ -25,8 +36,17 @@ impl ClusterCtrlSvcHandler { impl ClusterCtrlSvc for ClusterCtrlSvcHandler { async fn get_cluster_state( &self, - _request: tonic::Request, - ) -> Result, tonic::Status> { - unimplemented!() + _request: Request, + ) -> Result, Status> { + let cluster_state = self + .controller_handle + .get_cluster_state() + .await + .map_err(|_| tonic::Status::aborted("Node is shutting down"))?; + + // todo: remove this and return the actual state via protobuf + info!("Cluster state: {:?}", cluster_state); + + Ok(Response::new(ClusterStateResponse::default())) } } diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index d338312e6..c45991f1b 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -82,11 +82,9 @@ impl NetworkServer { .register_encoded_file_descriptor_set(cluster_ctrl::FILE_DESCRIPTOR_SET); } - let cluster_controller_service = if self.admin_deps.is_some() { - Some(ClusterCtrlSvcServer::new(ClusterCtrlSvcHandler::new())) - } else { - None - }; + let cluster_controller_service = self + .admin_deps + .map(|admin_deps| ClusterCtrlSvcServer::new(ClusterCtrlSvcHandler::new(admin_deps))); let server_builder = tonic::transport::Server::builder() .layer(TraceLayer::new_for_grpc().make_span_with(span_factory)) @@ -139,7 +137,7 @@ impl WorkerDependencies { } pub struct AdminDependencies { - pub _cluster_controller_handle: ClusterControllerHandle, + pub cluster_controller_handle: ClusterControllerHandle, pub metadata_store_client: MetadataStoreClient, } @@ -149,7 +147,7 @@ impl AdminDependencies { metadata_store_client: MetadataStoreClient, ) -> Self { AdminDependencies { - _cluster_controller_handle: cluster_controller_handle, + cluster_controller_handle, metadata_store_client, } } diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index 1f186ae36..d62a153e7 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -19,7 +19,7 @@ use restate_admin::service::AdminService; use restate_bifrost::Bifrost; use restate_cluster_controller::ClusterControllerHandle; use restate_core::metadata_store::MetadataStoreClient; -use restate_core::{task_center, Metadata, MetadataWriter, TaskKind}; +use restate_core::{task_center, Metadata, MetadataWriter, TaskCenter, TaskKind}; use restate_node_services::node_svc::node_svc_client::NodeSvcClient; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_service_protocol::discovery::ServiceDiscovery; @@ -48,6 +48,7 @@ pub struct AdminRole { impl AdminRole { pub fn new( + task_center: TaskCenter, updateable_config: UpdateableConfiguration, metadata: Metadata, networking: Networking, @@ -75,8 +76,12 @@ impl AdminRole { service_discovery, ); - let controller = - restate_cluster_controller::Service::new(metadata, networking, router_builder); + let controller = restate_cluster_controller::Service::new( + task_center, + metadata, + networking, + router_builder, + ); Ok(AdminRole { updateable_config, @@ -100,7 +105,11 @@ impl AdminRole { TaskKind::SystemService, "cluster-controller-service", None, - self.controller.run(), + self.controller.run( + self.updateable_config + .clone() + .map_as_updateable_owned(|c| &c.admin), + ), )?; // todo: Make address configurable diff --git a/crates/types/src/config/admin.rs b/crates/types/src/config/admin.rs index 58a94b915..94bbf3688 100644 --- a/crates/types/src/config/admin.rs +++ b/crates/types/src/config/admin.rs @@ -13,6 +13,7 @@ use serde_with::serde_as; use std::net::SocketAddr; use std::num::NonZeroUsize; use std::path::PathBuf; +use std::time::Duration; use tokio::sync::Semaphore; use super::QueryEngineOptions; @@ -39,6 +40,13 @@ pub struct AdminOptions { #[cfg(any(test, feature = "test-util"))] #[serde(skip, default = "super::default_arc_tmp")] data_dir: std::sync::Arc, + + /// # Controller heartbeats + /// + /// Controls the interval at which cluster controller polls nodes of the cluster. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub heartbeat_interval: humantime::Duration, } impl AdminOptions { @@ -71,6 +79,7 @@ impl Default for AdminOptions { query_engine: Default::default(), #[cfg(any(test, feature = "test-util"))] data_dir: super::default_arc_tmp(), + heartbeat_interval: Duration::from_millis(1500).into(), } } } diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index 03095ed82..b073dd386 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -32,6 +32,7 @@ pub mod metadata_store; pub mod net; pub mod nodes_config; pub mod partition_table; +pub mod processors; pub mod retries; pub mod state_mut; pub mod storage; diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index fff421aa5..7655e35fa 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -88,6 +88,10 @@ impl NodeConfig { roles, } } + + pub fn has_role(&self, role: Role) -> bool { + self.roles.contains(role) + } } impl NodesConfiguration { @@ -178,6 +182,16 @@ impl NodesConfiguration { }) } + pub fn iter(&self) -> impl Iterator { + self.nodes.iter().filter_map(|(k, v)| { + if let MaybeNode::Node(node) = v { + Some((*k, node)) + } else { + None + } + }) + } + /// Returns the maximum known plain node id. pub fn max_plain_node_id(&self) -> Option { self.nodes.keys().max().cloned() diff --git a/crates/types/src/processors/mod.rs b/crates/types/src/processors/mod.rs new file mode 100644 index 000000000..d98797c7c --- /dev/null +++ b/crates/types/src/processors/mod.rs @@ -0,0 +1,58 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use serde::{Deserialize, Serialize}; + +use crate::identifiers::LeaderEpoch; +use crate::logs::Lsn; +use crate::time::MillisSinceEpoch; +use crate::GenerationalNodeId; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)] +pub enum RunMode { + Leader, + Follower, +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub enum ReplayStatus { + Starting, + Active, + CatchingUp { target_tail_lsn: Lsn }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PartitionProcessorStatus { + pub updated_at: MillisSinceEpoch, + pub planned_mode: RunMode, + pub effective_mode: Option, + pub last_observed_leader_epoch: Option, + pub last_observed_leader_node: Option, + pub last_applied_log_lsn: Option, + pub last_record_applied_at: Option, + pub skipped_records: u64, + pub replay_status: ReplayStatus, +} + +impl PartitionProcessorStatus { + pub fn new(planned_mode: RunMode) -> Self { + Self { + updated_at: MillisSinceEpoch::now(), + planned_mode, + effective_mode: None, + last_observed_leader_epoch: None, + last_observed_leader_node: None, + last_applied_log_lsn: None, + last_record_applied_at: None, + skipped_records: 0, + replay_status: ReplayStatus::Starting, + } + } +} diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 1bdb15f77..5b428ed3c 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -59,6 +59,7 @@ humantime = { workspace = true } metrics = { workspace = true } opentelemetry = { workspace = true } pin-project = { workspace = true } +rand = { workspace = true } schemars = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index e8005002c..c1563628a 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -155,6 +155,7 @@ impl Worker { )?; let partition_processor_manager = PartitionProcessorManager::new( + task_center(), updateable_config.clone(), metadata.clone(), metadata_store_client, diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 8a0f29321..918924ad3 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -19,11 +19,10 @@ use assert2::let_assert; use futures::StreamExt; use metrics::{counter, histogram}; use restate_core::metadata; -use restate_core::worker_api::{PartitionProcessorStatus, ReplayStatus}; use restate_network::Networking; -use restate_node_protocol::cluster_controller::RunMode; use restate_partition_store::{PartitionStore, RocksDBTransaction}; use restate_types::identifiers::{PartitionId, PartitionKey}; +use restate_types::processors::{PartitionProcessorStatus, ReplayStatus, RunMode}; use restate_types::time::MillisSinceEpoch; use std::fmt::Debug; use std::marker::PhantomData; @@ -159,7 +158,9 @@ where bifrost, networking, ); - let mut status_update_timer = tokio::time::interval(Duration::from_millis(23)); + // avoid synchronized timers. We pick a randomised timer between 500 and 1023 millis. + let mut status_update_timer = + tokio::time::interval(Duration::from_millis(500 + rand::random::() % 524)); status_update_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut cancellation = std::pin::pin!(cancellation_watcher()); diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index e077009eb..2b0d2ccc9 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -13,21 +13,27 @@ use std::ops::RangeInclusive; use std::time::Duration; use anyhow::Context; +use futures::stream::BoxStream; +use futures::stream::StreamExt; +use restate_core::network::NetworkSender; +use restate_core::TaskCenter; use restate_network::rpc_router::{RpcError, RpcRouter}; +use restate_node_protocol::partition_processor_manager::GetProcessorsState; +use restate_node_protocol::partition_processor_manager::ProcessorsStateResponse; +use restate_node_protocol::RpcMessage; +use restate_types::processors::{PartitionProcessorStatus, RunMode}; use tokio::sync::{mpsc, watch}; use tracing::{debug, info, warn}; use restate_bifrost::Bifrost; use restate_core::network::MessageRouterBuilder; -use restate_core::worker_api::{ - PartitionProcessorStatus, ProcessorsManagerCommand, ProcessorsManagerHandle, -}; -use restate_core::{cancellation_watcher, task_center, Metadata, ShutdownError, TaskId, TaskKind}; +use restate_core::worker_api::{ProcessorsManagerCommand, ProcessorsManagerHandle}; +use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskId, TaskKind}; use restate_invoker_impl::InvokerHandle; use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError}; use restate_network::Networking; use restate_node_protocol::cluster_controller::AttachRequest; -use restate_node_protocol::cluster_controller::{Action, AttachResponse, RunMode}; +use restate_node_protocol::cluster_controller::{Action, AttachResponse}; use restate_node_protocol::MessageEnvelope; use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_types::arc_util::ArcSwapExt; @@ -46,6 +52,7 @@ use crate::partition::PartitionProcessorControlCommand; use crate::PartitionProcessor; pub struct PartitionProcessorManager { + task_center: TaskCenter, updateable_config: UpdateableConfiguration, running_partition_processors: BTreeMap, @@ -53,6 +60,7 @@ pub struct PartitionProcessorManager { metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, attach_router: RpcRouter, + incoming_get_state: BoxStream<'static, MessageEnvelope>, networking: Networking, bifrost: Bifrost, invoker_handle: InvokerHandle>, @@ -73,13 +81,14 @@ struct State { _created_at: MillisSinceEpoch, _key_range: RangeInclusive, _control_tx: mpsc::Sender, - _watch_rx: watch::Receiver, + watch_rx: watch::Receiver, _task_id: TaskId, } impl PartitionProcessorManager { #[allow(clippy::too_many_arguments)] pub fn new( + task_center: TaskCenter, updateable_config: UpdateableConfiguration, metadata: Metadata, metadata_store_client: MetadataStoreClient, @@ -90,14 +99,17 @@ impl PartitionProcessorManager { invoker_handle: InvokerHandle>, ) -> Self { let attach_router = RpcRouter::new(networking.clone(), router_builder); + let incoming_get_state = router_builder.subscribe_to_stream(2); let (tx, rx) = mpsc::channel(updateable_config.load().worker.internal_queue_length()); Self { + task_center, updateable_config, running_partition_processors: BTreeMap::default(), metadata, metadata_store_client, partition_store_manager, + incoming_get_state, networking, bifrost, invoker_handle, @@ -174,6 +186,9 @@ impl PartitionProcessorManager { self.on_command(command); debug!("PartitionProcessorManager shutting down"); } + Some(get_state) = self.incoming_get_state.next() => { + self.on_get_state(get_state); + } _ = &mut shutdown => { return Ok(()); } @@ -181,6 +196,32 @@ impl PartitionProcessorManager { } } + fn on_get_state(&self, get_state_msg: MessageEnvelope) { + let (from, msg) = get_state_msg.split(); + // For all running partitions, collect state and send it back. + let state: BTreeMap = self + .running_partition_processors + .iter() + .map(|(partition_id, state)| { + let status = state.watch_rx.borrow().clone(); + (*partition_id, status) + }) + .collect(); + + let response = ProcessorsStateResponse { + request_id: msg.correlation_id(), + state, + }; + let networking = self.networking.clone(); + // ignore shutdown errors. + let _ = self.task_center.spawn( + restate_core::TaskKind::Disposable, + "get-processors-state-response", + None, + async move { Ok(networking.send(from.into(), &response).await?) }, + ); + } + fn on_command(&mut self, command: ProcessorsManagerCommand) { use ProcessorsManagerCommand::*; match command { @@ -220,7 +261,7 @@ impl PartitionProcessorManager { _key_range: action.key_range_inclusive.clone().into(), _task_id, _control_tx: control_tx, - _watch_rx: watch_rx, + watch_rx, }; self.running_partition_processors .insert(action.partition_id, state); @@ -261,7 +302,7 @@ impl PartitionProcessorManager { let metadata_store_client = self.metadata_store_client.clone(); let node_id = self.metadata.my_node_id(); - task_center().spawn_child( + self.task_center.spawn_child( TaskKind::PartitionProcessor, "partition-processor", Some(processor.partition_id),