diff --git a/Cargo.lock b/Cargo.lock index 50b6bc7a1..927b4fc0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5375,12 +5375,15 @@ name = "restate-cluster-controller" version = "0.9.1" dependencies = [ "anyhow", + "arc-swap", "codederror", "derive_builder", "drain", "futures", "restate-core", "restate-errors", + "restate-network", + "restate-node-protocol", "restate-types", "schemars", "serde", @@ -5827,6 +5830,7 @@ dependencies = [ "restate-schema", "restate-types", "serde", + "serde_with", "strum 0.26.2", "strum_macros 0.26.2", "thiserror", @@ -6360,6 +6364,7 @@ dependencies = [ "opentelemetry", "pin-project", "prost", + "rand", "restate-bifrost", "restate-core", "restate-errors", diff --git a/crates/cluster-controller/Cargo.toml b/crates/cluster-controller/Cargo.toml index acc2dae8a..35712db5e 100644 --- a/crates/cluster-controller/Cargo.toml +++ b/crates/cluster-controller/Cargo.toml @@ -14,9 +14,12 @@ options_schema = ["dep:schemars"] [dependencies] restate-core = { workspace = true } restate-errors = { workspace = true } +restate-network = { workspace = true } +restate-node-protocol = { workspace = true } restate-types = { workspace = true } anyhow = { workspace = true } +arc-swap = { workspace = true } codederror = { workspace = true } derive_builder = { workspace = true } drain = { workspace = true } diff --git a/crates/cluster-controller/src/cluster_state.rs b/crates/cluster-controller/src/cluster_state.rs new file mode 100644 index 000000000..16aed96ed --- /dev/null +++ b/crates/cluster-controller/src/cluster_state.rs @@ -0,0 +1,230 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use arc_swap::ArcSwap; +use restate_core::network::MessageRouterBuilder; +use restate_network::rpc_router::RpcRouter; +use restate_node_protocol::partition_processor_manager::GetProcessorsState; +use restate_types::identifiers::PartitionId; +use restate_types::nodes_config::Role; +use restate_types::processors::PartitionProcessorStatus; +use restate_types::time::MillisSinceEpoch; +use tokio::task::JoinHandle; +use tokio::time::Instant; + +use restate_core::{Metadata, ShutdownError, TaskCenter}; +use restate_network::Networking; +use restate_types::{GenerationalNodeId, PlainNodeId, Version}; + +/// A container for health information about every node and partition in the +/// cluster. +#[derive(Debug, Clone)] +pub struct ClusterState { + pub last_refreshed: Option, + pub nodes_config_version: Version, + pub partition_table_version: Version, + pub nodes: BTreeMap, +} + +impl ClusterState { + pub fn is_reliable(&self) -> bool { + // todo: make this configurable + // If the cluster state is older than 10 seconds, then it is not reliable. + self.last_refreshed + .map(|last_refreshed| last_refreshed.elapsed().as_secs() < 10) + .unwrap_or(false) + } +} + +#[derive(Debug, Clone)] +pub enum NodeState { + Alive { + last_heartbeat_at: MillisSinceEpoch, + generation: GenerationalNodeId, + partitions: BTreeMap, + }, + Dead { + last_seen_alive: Option, + }, +} + +pub struct ClusterStateRefresher { + task_center: TaskCenter, + metadata: Metadata, + get_state_router: RpcRouter, + updateable_cluster_state: Arc>, + in_flight_refresh: Option>, +} + +impl ClusterStateRefresher { + pub fn new( + task_center: TaskCenter, + metadata: Metadata, + networking: Networking, + router_builder: &mut MessageRouterBuilder, + ) -> Self { + let get_state_router = RpcRouter::new(networking.clone(), router_builder); + + let initial_state = ClusterState { + last_refreshed: None, + nodes_config_version: Version::INVALID, + partition_table_version: Version::INVALID, + nodes: BTreeMap::new(), + }; + let updateable_cluster_state = Arc::new(ArcSwap::from_pointee(initial_state)); + + Self { + task_center, + metadata, + get_state_router, + updateable_cluster_state, + in_flight_refresh: None, + } + } + + pub fn get_cluster_state(&self) -> Arc { + self.updateable_cluster_state.load_full() + } + + pub fn schedule_refresh(&mut self) -> Result<(), ShutdownError> { + // if in-flight refresh is happening, then ignore. + if let Some(handle) = &self.in_flight_refresh { + if handle.is_finished() { + self.in_flight_refresh = None; + } else { + // still in flight. + return Ok(()); + } + } + + self.in_flight_refresh = Self::start_refresh_task( + self.task_center.clone(), + self.get_state_router.clone(), + self.updateable_cluster_state.clone(), + self.metadata.clone(), + )?; + + Ok(()) + } + + fn start_refresh_task( + tc: TaskCenter, + get_state_router: RpcRouter, + updateable_cluster_state: Arc>, + metadata: Metadata, + ) -> Result>, ShutdownError> { + let task_center = tc.clone(); + let refresh = async move { + let last_state = updateable_cluster_state.load(); + // make sure we have a partition table that equals or newer than last refresh + let partition_table = metadata + .wait_for_partition_table(last_state.partition_table_version) + .await?; + let _ = metadata + .wait_for_version( + restate_core::MetadataKind::NodesConfiguration, + last_state.nodes_config_version, + ) + .await; + let nodes_config = metadata.nodes_config(); + + let mut nodes = BTreeMap::new(); + let mut join_set = tokio::task::JoinSet::new(); + for (node_id, node) in nodes_config.iter() { + // We are only interested in worker nodes. + if !node.has_role(Role::Worker) { + continue; + } + + let rpc_router = get_state_router.clone(); + let tc = tc.clone(); + join_set + .build_task() + .name("get-processors-state") + .spawn(async move { + tc.run_in_scope("get-processor-state", None, async move { + ( + node_id, + tokio::time::timeout( + // todo: make configurable + std::time::Duration::from_secs(1), + rpc_router.call(node_id.into(), &GetProcessorsState::default()), + ) + .await, + ) + }) + .await + }) + .expect("to spawn task"); + } + while let Some(Ok((node_id, res))) = join_set.join_next().await { + // Did the node timeout? consider it dead + // Important note: This is a naive mechanism for failure detection, this should be + // considered a temporary design until we get to build a more robust detection that + // accounts for flaky or bouncy nodes, but we assume that the timeout is large + // enough that a single timeout signifies a dead node. + // + // The node gets the same treatment on other RpcErrors. + let Ok(Ok(res)) = res else { + // determine last seen alive. + let last_seen_alive = + last_state + .nodes + .get(&node_id) + .and_then(|state| match state { + NodeState::Alive { + last_heartbeat_at, .. + } => Some(*last_heartbeat_at), + NodeState::Dead { last_seen_alive } => *last_seen_alive, + }); + + nodes.insert(node_id, NodeState::Dead { last_seen_alive }); + continue; + }; + + let (from, msg) = res.split(); + + nodes.insert( + node_id, + NodeState::Alive { + last_heartbeat_at: MillisSinceEpoch::now(), + generation: from, + partitions: msg.state, + }, + ); + } + + let state = ClusterState { + last_refreshed: Some(Instant::now()), + nodes_config_version: nodes_config.version(), + partition_table_version: partition_table.version(), + nodes, + }; + + // publish the new state + updateable_cluster_state.store(Arc::new(state)); + Ok(()) + }; + + let task_id = task_center.spawn( + restate_core::TaskKind::Disposable, + "cluster-state-refresh", + None, + refresh, + )?; + + // If this returned None, it means that the task completed or has been + // cancelled before we get to this point. + Ok(task_center.take_task(task_id)) + } +} diff --git a/crates/cluster-controller/src/lib.rs b/crates/cluster-controller/src/lib.rs index ac9ac141f..dff387bb3 100644 --- a/crates/cluster-controller/src/lib.rs +++ b/crates/cluster-controller/src/lib.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod cluster_state; mod service; pub use service::{ClusterControllerHandle, Error, Service}; diff --git a/crates/cluster-controller/src/service.rs b/crates/cluster-controller/src/service.rs index d5c287ec5..c5602b35f 100644 --- a/crates/cluster-controller/src/service.rs +++ b/crates/cluster-controller/src/service.rs @@ -8,8 +8,31 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::sync::Arc; + use codederror::CodedError; -use restate_core::cancellation_watcher; +use futures::stream::BoxStream; +use futures::StreamExt; +use tokio::time::Instant; + +use restate_network::Networking; +use restate_node_protocol::cluster_controller::{ + Action, AttachRequest, AttachResponse, RunPartition, +}; +use restate_node_protocol::common::{KeyRange, RequestId}; +use restate_types::arc_util::Updateable; +use restate_types::config::AdminOptions; +use restate_types::partition_table::FixedPartitionTable; + +use restate_core::network::{MessageRouterBuilder, NetworkSender}; +use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskCenter}; +use restate_node_protocol::MessageEnvelope; +use restate_types::processors::RunMode; +use restate_types::{GenerationalNodeId, Version}; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::MissedTickBehavior; + +use crate::cluster_state::{ClusterState, ClusterStateRefresher}; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -18,19 +41,154 @@ pub enum Error { Error, } -#[derive(Debug, Default)] -pub struct Service {} +pub struct Service { + task_center: TaskCenter, + metadata: Metadata, + networking: Networking, + incoming_messages: BoxStream<'static, MessageEnvelope>, + cluster_state_refresher: ClusterStateRefresher, + command_tx: mpsc::Sender, + command_rx: mpsc::Receiver, +} + +impl Service { + pub fn new( + task_center: TaskCenter, + metadata: Metadata, + networking: Networking, + router_builder: &mut MessageRouterBuilder, + ) -> Self { + let incoming_messages = router_builder.subscribe_to_stream(10); + let (command_tx, command_rx) = mpsc::channel(2); + + let cluster_state_refresher = ClusterStateRefresher::new( + task_center.clone(), + metadata.clone(), + networking.clone(), + router_builder, + ); + + Service { + task_center, + metadata, + networking, + incoming_messages, + cluster_state_refresher, + command_tx, + command_rx, + } + } +} -// todo: Replace with proper handle -pub struct ClusterControllerHandle; +enum ClusterControllerCommand { + GetClusterState(oneshot::Sender>), +} + +pub struct ClusterControllerHandle { + tx: mpsc::Sender, +} + +impl ClusterControllerHandle { + pub async fn get_cluster_state(&self) -> Result, ShutdownError> { + let (tx, rx) = oneshot::channel(); + // ignore the error, we own both tx and rx at this point. + let _ = self + .tx + .send(ClusterControllerCommand::GetClusterState(tx)) + .await; + rx.await.map_err(|_| ShutdownError) + } +} impl Service { pub fn handle(&self) -> ClusterControllerHandle { - ClusterControllerHandle + ClusterControllerHandle { + tx: self.command_tx.clone(), + } } - pub async fn run(self) -> anyhow::Result<()> { - let _ = cancellation_watcher().await; + pub async fn run( + mut self, + mut updateable_config: impl Updateable, + ) -> anyhow::Result<()> { + let options = updateable_config.load(); + // Make sure we have partition table before starting + let _ = self.metadata.wait_for_partition_table(Version::MIN).await?; + let mut heartbeat = + tokio::time::interval_at(Instant::now(), options.heartbeat_interval.into()); + heartbeat.set_missed_tick_behavior(MissedTickBehavior::Delay); + let mut shutdown = std::pin::pin!(cancellation_watcher()); + loop { + tokio::select! { + _ = heartbeat.tick() => { + // Ignore error if system is shutting down + let _ = self.cluster_state_refresher.schedule_refresh(); + } + Some(cmd) = self.command_rx.recv() => { + self.on_get_cluster_state(cmd); + } + Some(message) = self.incoming_messages.next() => { + let (from, message) = message.split(); + self.on_attach_request(from, message)?; + } + _ = &mut shutdown => { + return Ok(()); + } + } + } + } + + fn on_get_cluster_state(&self, command: ClusterControllerCommand) { + match command { + ClusterControllerCommand::GetClusterState(tx) => { + let _ = tx.send(self.cluster_state_refresher.get_cluster_state()); + } + } + } + + fn on_attach_request( + &mut self, + from: GenerationalNodeId, + request: AttachRequest, + ) -> Result<(), ShutdownError> { + let partition_table = self + .metadata + .partition_table() + .expect("partition table is loaded before run"); + let networking = self.networking.clone(); + let response = self.create_attachment_response(&partition_table, from, request.request_id); + self.task_center.spawn( + restate_core::TaskKind::Disposable, + "attachment-response", + None, + async move { Ok(networking.send(from.into(), &response).await?) }, + )?; Ok(()) } + + fn create_attachment_response( + &self, + partition_table: &FixedPartitionTable, + _node: GenerationalNodeId, + request_id: RequestId, + ) -> AttachResponse { + // simulating a plan after initial attachement + let actions = partition_table + .partitioner() + .map(|(partition_id, key_range)| { + Action::RunPartition(RunPartition { + partition_id, + key_range_inclusive: KeyRange { + from: *key_range.start(), + to: *key_range.end(), + }, + mode: RunMode::Leader, + }) + }) + .collect(); + AttachResponse { + request_id, + actions, + } + } } diff --git a/crates/network/src/connection_manager.rs b/crates/network/src/connection_manager.rs index 3a146fd62..985a0498c 100644 --- a/crates/network/src/connection_manager.rs +++ b/crates/network/src/connection_manager.rs @@ -448,12 +448,13 @@ where "task_id", tracing::field::display(current_task_id().unwrap()), ); + let mut cancellation = std::pin::pin!(cancellation_watcher()); // Receive loop loop { // read a message from the stream let msg = tokio::select! { biased; - _ = cancellation_watcher() => { + _ = &mut cancellation => { connection.send_control_frame(ConnectionControl::shutdown()); break; }, diff --git a/crates/network/src/rpc_router.rs b/crates/network/src/rpc_router.rs index 23e458bab..e7a78fb7d 100644 --- a/crates/network/src/rpc_router.rs +++ b/crates/network/src/rpc_router.rs @@ -32,6 +32,7 @@ use crate::Networking; /// tracking tokens if caller dropped the future. /// /// This type is designed to be used by senders of RpcRequest(s). +#[derive(Clone)] pub struct RpcRouter where T: RpcRequest, diff --git a/crates/node-protocol/Cargo.toml b/crates/node-protocol/Cargo.toml index 678720cac..93789eb65 100644 --- a/crates/node-protocol/Cargo.toml +++ b/crates/node-protocol/Cargo.toml @@ -23,6 +23,7 @@ flexbuffers = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } serde = { workspace = true} +serde_with = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } diff --git a/crates/node-protocol/proto/common.proto b/crates/node-protocol/proto/common.proto index 42ea2e1d0..f23bceb7f 100644 --- a/crates/node-protocol/proto/common.proto +++ b/crates/node-protocol/proto/common.proto @@ -22,16 +22,18 @@ message NodeId { } // A generic type for versioned metadata -message Version { - uint32 value = 1; -} +message Version { uint32 value = 1; } -// The target service for a message +// The handle name or type tag of the message. For every target there must be +// exactly one message handler implementation. enum TargetName { TargetName_UNKNOWN = 0; METADATA_MANAGER = 1; INGRESS = 2; LOCAL_METADATA_STORE = 3; LOCAL_METADATA_STORE_CLIENT = 4; + ATTACH_REQUEST = 5; + ATTACH_RESPONSE = 6; + GET_PROCESSORS_STATE_REQUEST = 7; + PROCESSORS_STATE_RESPONSE = 8; } - diff --git a/crates/node-protocol/src/cluster_controller.rs b/crates/node-protocol/src/cluster_controller.rs new file mode 100644 index 000000000..4de6b1b8f --- /dev/null +++ b/crates/node-protocol/src/cluster_controller.rs @@ -0,0 +1,47 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use serde::{Deserialize, Serialize}; + +use restate_types::identifiers::PartitionId; +use restate_types::processors::RunMode; + +use crate::common::{KeyRange, RequestId, TargetName}; +use crate::define_rpc; + +define_rpc! { + @request = AttachRequest, + @response = AttachResponse, + @request_target = TargetName::AttachRequest, + @response_target = TargetName::AttachResponse, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct AttachRequest { + pub request_id: RequestId, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AttachResponse { + pub request_id: RequestId, + pub actions: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Action { + RunPartition(RunPartition), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RunPartition { + pub partition_id: PartitionId, + pub key_range_inclusive: KeyRange, + pub mode: RunMode, +} diff --git a/crates/node-protocol/src/common.rs b/crates/node-protocol/src/common.rs index d983b5ea5..85db58d36 100644 --- a/crates/node-protocol/src/common.rs +++ b/crates/node-protocol/src/common.rs @@ -8,8 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::ops::RangeInclusive; use std::sync::atomic::AtomicUsize; +use restate_types::identifiers::PartitionKey; + include!(concat!(env!("OUT_DIR"), "/dev.restate.common.rs")); pub static MIN_SUPPORTED_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::Flexbuffers; @@ -105,6 +108,18 @@ impl From for NodeId { } } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KeyRange { + pub from: PartitionKey, + pub to: PartitionKey, +} + +impl From for RangeInclusive { + fn from(val: KeyRange) -> Self { + RangeInclusive::new(val.from, val.to) + } +} + // write tests for RequestId #[cfg(test)] mod tests { diff --git a/crates/node-protocol/src/lib.rs b/crates/node-protocol/src/lib.rs index 6f9762e1c..6bb6787f6 100644 --- a/crates/node-protocol/src/lib.rs +++ b/crates/node-protocol/src/lib.rs @@ -8,12 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +pub mod cluster_controller; pub mod codec; pub mod common; mod error; pub mod ingress; pub mod metadata; pub mod node; +pub mod partition_processor_manager; // re-exports for convenience pub use common::CURRENT_PROTOCOL_VERSION; @@ -50,6 +52,10 @@ impl MessageEnvelope { pub fn split(self) -> (GenerationalNodeId, M) { (self.peer, self.body) } + + pub fn body(&self) -> &M { + &self.body + } } impl MessageEnvelope { diff --git a/crates/node-protocol/src/partition_processor_manager.rs b/crates/node-protocol/src/partition_processor_manager.rs new file mode 100644 index 000000000..2b552c7a4 --- /dev/null +++ b/crates/node-protocol/src/partition_processor_manager.rs @@ -0,0 +1,39 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; + +use restate_types::identifiers::PartitionId; +use restate_types::processors::PartitionProcessorStatus; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; + +use crate::common::{RequestId, TargetName}; +use crate::define_rpc; + +define_rpc! { + @request = GetProcessorsState, + @response = ProcessorsStateResponse, + @request_target = TargetName::GetProcessorsStateRequest, + @response_target = TargetName::ProcessorsStateResponse, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct GetProcessorsState { + pub request_id: RequestId, +} + +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProcessorsStateResponse { + pub request_id: RequestId, + #[serde_as(as = "serde_with::Seq<(_, _)>")] + pub state: BTreeMap, +} diff --git a/crates/node-services/proto/cluster_ctrl_svc.proto b/crates/node-services/proto/cluster_ctrl_svc.proto index 980527336..263cc86f8 100644 --- a/crates/node-services/proto/cluster_ctrl_svc.proto +++ b/crates/node-services/proto/cluster_ctrl_svc.proto @@ -14,13 +14,9 @@ import "common.proto"; package dev.restate.cluster_ctrl; service ClusterCtrlSvc { - // Attach worker at cluster controller - rpc AttachNode(AttachmentRequest) returns (AttachmentResponse); - + rpc GetClusterState(ClusterStateRequest) returns (ClusterStateResponse); } -message AttachmentRequest { - optional dev.restate.common.NodeId node_id = 1; -} +message ClusterStateRequest {} -message AttachmentResponse {} +message ClusterStateResponse {} diff --git a/crates/node-services/proto/node_svc.proto b/crates/node-services/proto/node_svc.proto index 5c2c05d74..0300aa2d2 100644 --- a/crates/node-services/proto/node_svc.proto +++ b/crates/node-services/proto/node_svc.proto @@ -46,4 +46,4 @@ message StorageQueryRequest { string query = 1; } message StorageQueryResponse { bytes header = 1; bytes data = 2; -} \ No newline at end of file +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 2c0dd1a11..a36072544 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -144,8 +144,12 @@ impl Node { let admin_role = if config.has_role(Role::Admin) { Some(AdminRole::new( + task_center(), updateable_config.clone(), + metadata.clone(), + networking.clone(), metadata_manager.writer(), + &mut router_builder, metadata_store_client.clone(), )?) } else { diff --git a/crates/node/src/network_server/handler/cluster_ctrl.rs b/crates/node/src/network_server/handler/cluster_ctrl.rs index 8be95b29b..8453f2779 100644 --- a/crates/node/src/network_server/handler/cluster_ctrl.rs +++ b/crates/node/src/network_server/handler/cluster_ctrl.rs @@ -9,27 +9,44 @@ // by the Apache License, Version 2.0. use tonic::{async_trait, Request, Response, Status}; -use tracing::debug; +use tracing::info; +use restate_cluster_controller::ClusterControllerHandle; +use restate_metadata_store::MetadataStoreClient; use restate_node_services::cluster_ctrl::cluster_ctrl_svc_server::ClusterCtrlSvc; -use restate_node_services::cluster_ctrl::{AttachmentRequest, AttachmentResponse}; +use restate_node_services::cluster_ctrl::{ClusterStateRequest, ClusterStateResponse}; -pub struct ClusterCtrlSvcHandler {} +use crate::network_server::AdminDependencies; + +pub struct ClusterCtrlSvcHandler { + _metadata_store_client: MetadataStoreClient, + controller_handle: ClusterControllerHandle, +} impl ClusterCtrlSvcHandler { - pub fn new() -> Self { - Self {} + pub fn new(admin_deps: AdminDependencies) -> Self { + Self { + controller_handle: admin_deps.cluster_controller_handle, + _metadata_store_client: admin_deps.metadata_store_client, + } } } #[async_trait] impl ClusterCtrlSvc for ClusterCtrlSvcHandler { - async fn attach_node( + async fn get_cluster_state( &self, - request: Request, - ) -> Result, Status> { - let node_id = request.into_inner().node_id.expect("node id must be set"); - debug!("Attaching node '{:?}'", node_id); - Ok(Response::new(AttachmentResponse {})) + _request: Request, + ) -> Result, Status> { + let cluster_state = self + .controller_handle + .get_cluster_state() + .await + .map_err(|_| tonic::Status::aborted("Node is shutting down"))?; + + // todo: remove this and return the actual state via protobuf + info!("Cluster state: {:?}", cluster_state); + + Ok(Response::new(ClusterStateResponse::default())) } } diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index d338312e6..c45991f1b 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -82,11 +82,9 @@ impl NetworkServer { .register_encoded_file_descriptor_set(cluster_ctrl::FILE_DESCRIPTOR_SET); } - let cluster_controller_service = if self.admin_deps.is_some() { - Some(ClusterCtrlSvcServer::new(ClusterCtrlSvcHandler::new())) - } else { - None - }; + let cluster_controller_service = self + .admin_deps + .map(|admin_deps| ClusterCtrlSvcServer::new(ClusterCtrlSvcHandler::new(admin_deps))); let server_builder = tonic::transport::Server::builder() .layer(TraceLayer::new_for_grpc().make_span_with(span_factory)) @@ -139,7 +137,7 @@ impl WorkerDependencies { } pub struct AdminDependencies { - pub _cluster_controller_handle: ClusterControllerHandle, + pub cluster_controller_handle: ClusterControllerHandle, pub metadata_store_client: MetadataStoreClient, } @@ -149,7 +147,7 @@ impl AdminDependencies { metadata_store_client: MetadataStoreClient, ) -> Self { AdminDependencies { - _cluster_controller_handle: cluster_controller_handle, + cluster_controller_handle, metadata_store_client, } } diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index 5477e1b50..d62a153e7 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -10,7 +10,8 @@ use anyhow::Context; use codederror::CodedError; -use restate_types::arc_util::ArcSwapExt; +use restate_core::network::MessageRouterBuilder; +use restate_network::Networking; use std::time::Duration; use tonic::transport::Channel; @@ -18,10 +19,11 @@ use restate_admin::service::AdminService; use restate_bifrost::Bifrost; use restate_cluster_controller::ClusterControllerHandle; use restate_core::metadata_store::MetadataStoreClient; -use restate_core::{task_center, MetadataWriter, TaskKind}; +use restate_core::{task_center, Metadata, MetadataWriter, TaskCenter, TaskKind}; use restate_node_services::node_svc::node_svc_client::NodeSvcClient; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_service_protocol::discovery::ServiceDiscovery; +use restate_types::arc_util::ArcSwapExt; use restate_types::config::{IngressOptions, UpdateableConfiguration}; use restate_types::retries::RetryPolicy; @@ -46,8 +48,12 @@ pub struct AdminRole { impl AdminRole { pub fn new( + task_center: TaskCenter, updateable_config: UpdateableConfiguration, + metadata: Metadata, + networking: Networking, metadata_writer: MetadataWriter, + router_builder: &mut MessageRouterBuilder, metadata_store_client: MetadataStoreClient, ) -> Result { let config = updateable_config.pinned(); @@ -70,9 +76,16 @@ impl AdminRole { service_discovery, ); + let controller = restate_cluster_controller::Service::new( + task_center, + metadata, + networking, + router_builder, + ); + Ok(AdminRole { updateable_config, - controller: restate_cluster_controller::Service::default(), + controller, admin, }) } @@ -92,7 +105,11 @@ impl AdminRole { TaskKind::SystemService, "cluster-controller-service", None, - self.controller.run(), + self.controller.run( + self.updateable_config + .clone() + .map_as_updateable_owned(|c| &c.admin), + ), )?; // todo: Make address configurable diff --git a/crates/types/src/config/admin.rs b/crates/types/src/config/admin.rs index 58a94b915..94bbf3688 100644 --- a/crates/types/src/config/admin.rs +++ b/crates/types/src/config/admin.rs @@ -13,6 +13,7 @@ use serde_with::serde_as; use std::net::SocketAddr; use std::num::NonZeroUsize; use std::path::PathBuf; +use std::time::Duration; use tokio::sync::Semaphore; use super::QueryEngineOptions; @@ -39,6 +40,13 @@ pub struct AdminOptions { #[cfg(any(test, feature = "test-util"))] #[serde(skip, default = "super::default_arc_tmp")] data_dir: std::sync::Arc, + + /// # Controller heartbeats + /// + /// Controls the interval at which cluster controller polls nodes of the cluster. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub heartbeat_interval: humantime::Duration, } impl AdminOptions { @@ -71,6 +79,7 @@ impl Default for AdminOptions { query_engine: Default::default(), #[cfg(any(test, feature = "test-util"))] data_dir: super::default_arc_tmp(), + heartbeat_interval: Duration::from_millis(1500).into(), } } } diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index 42d129abd..3426e3f26 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -33,6 +33,7 @@ pub mod metadata_store; pub mod net; pub mod nodes_config; pub mod partition_table; +pub mod processors; pub mod retries; pub mod service_discovery; pub mod service_protocol; diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index fff421aa5..7655e35fa 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -88,6 +88,10 @@ impl NodeConfig { roles, } } + + pub fn has_role(&self, role: Role) -> bool { + self.roles.contains(role) + } } impl NodesConfiguration { @@ -178,6 +182,16 @@ impl NodesConfiguration { }) } + pub fn iter(&self) -> impl Iterator { + self.nodes.iter().filter_map(|(k, v)| { + if let MaybeNode::Node(node) = v { + Some((*k, node)) + } else { + None + } + }) + } + /// Returns the maximum known plain node id. pub fn max_plain_node_id(&self) -> Option { self.nodes.keys().max().cloned() diff --git a/crates/types/src/processors/mod.rs b/crates/types/src/processors/mod.rs new file mode 100644 index 000000000..d98797c7c --- /dev/null +++ b/crates/types/src/processors/mod.rs @@ -0,0 +1,58 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use serde::{Deserialize, Serialize}; + +use crate::identifiers::LeaderEpoch; +use crate::logs::Lsn; +use crate::time::MillisSinceEpoch; +use crate::GenerationalNodeId; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)] +pub enum RunMode { + Leader, + Follower, +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub enum ReplayStatus { + Starting, + Active, + CatchingUp { target_tail_lsn: Lsn }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PartitionProcessorStatus { + pub updated_at: MillisSinceEpoch, + pub planned_mode: RunMode, + pub effective_mode: Option, + pub last_observed_leader_epoch: Option, + pub last_observed_leader_node: Option, + pub last_applied_log_lsn: Option, + pub last_record_applied_at: Option, + pub skipped_records: u64, + pub replay_status: ReplayStatus, +} + +impl PartitionProcessorStatus { + pub fn new(planned_mode: RunMode) -> Self { + Self { + updated_at: MillisSinceEpoch::now(), + planned_mode, + effective_mode: None, + last_observed_leader_epoch: None, + last_observed_leader_node: None, + last_applied_log_lsn: None, + last_record_applied_at: None, + skipped_records: 0, + replay_status: ReplayStatus::Starting, + } + } +} diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 5044625f5..663874a57 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -59,6 +59,7 @@ humantime = { workspace = true } metrics = { workspace = true } opentelemetry = { workspace = true } pin-project = { workspace = true } +rand = { workspace = true } schemars = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 0e37a8e06..ab2019fad 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -160,10 +160,12 @@ impl Worker { )?; let partition_processor_manager = PartitionProcessorManager::new( + task_center(), updateable_config.clone(), metadata.clone(), metadata_store_client, partition_store_manager.clone(), + router_builder, networking, bifrost, invoker.handle(), diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 4c27265fc..918924ad3 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -22,10 +22,14 @@ use restate_core::metadata; use restate_network::Networking; use restate_partition_store::{PartitionStore, RocksDBTransaction}; use restate_types::identifiers::{PartitionId, PartitionKey}; +use restate_types::processors::{PartitionProcessorStatus, ReplayStatus, RunMode}; +use restate_types::time::MillisSinceEpoch; use std::fmt::Debug; use std::marker::PhantomData; use std::ops::RangeInclusive; -use std::time::Instant; +use std::time::{Duration, Instant}; +use tokio::sync::{mpsc, watch}; +use tokio::time::MissedTickBehavior; use tracing::{debug, instrument, trace, Span}; mod action_effect_handler; @@ -47,6 +51,9 @@ use restate_wal_protocol::{Command, Destination, Envelope, Header}; use self::storage::invoker::InvokerStorageReader; +/// Control messages from Manager to individual partition processor instances. +pub enum PartitionProcessorControlCommand {} + #[derive(Debug)] pub(super) struct PartitionProcessor { pub partition_id: PartitionId, @@ -55,7 +62,10 @@ pub(super) struct PartitionProcessor { num_timers_in_memory_limit: Option, channel_size: usize, + status: PartitionProcessorStatus, invoker_tx: InvokerInputSender, + control_rx: mpsc::Receiver, + status_watch_tx: watch::Sender, _entry_codec: PhantomData, } @@ -70,23 +80,29 @@ where pub(super) fn new( partition_id: PartitionId, partition_key_range: RangeInclusive, + status: PartitionProcessorStatus, num_timers_in_memory_limit: Option, channel_size: usize, + control_rx: mpsc::Receiver, + status_watch_tx: watch::Sender, invoker_tx: InvokerInputSender, ) -> Self { Self { partition_id, partition_key_range, + status, num_timers_in_memory_limit, channel_size, invoker_tx, + control_rx, + status_watch_tx, _entry_codec: Default::default(), } } #[instrument(level = "info", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))] pub(super) async fn run( - self, + mut self, networking: Networking, bifrost: Bifrost, partition_store: PartitionStore, @@ -111,15 +127,22 @@ where let last_applied_lsn = partition_storage.load_applied_lsn().await?; let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID); - if tracing::event_enabled!(tracing::Level::DEBUG) { - let current_tail = bifrost - .find_tail(LogId::from(partition_id), FindTailAttributes::default()) - .await?; - debug!( - last_applied_lsn = %last_applied_lsn, - current_log_tail = ?current_tail, - "PartitionProcessor creating log reader", - ); + self.status.last_applied_log_lsn = Some(last_applied_lsn); + let current_tail = bifrost + .find_tail(LogId::from(partition_id), FindTailAttributes::default()) + .await?; + debug!( + last_applied_lsn = %last_applied_lsn, + current_log_tail = ?current_tail, + "PartitionProcessor creating log reader", + ); + if current_tail.is_none() || current_tail.is_some_and(|tail| tail == last_applied_lsn) { + self.status.replay_status = ReplayStatus::Active; + } else { + // catching up. + self.status.replay_status = ReplayStatus::CatchingUp { + target_tail_lsn: current_tail.unwrap(), + } } let mut log_reader = LogReader::new(&bifrost, LogId::from(partition_id), last_applied_lsn); @@ -135,12 +158,25 @@ where bifrost, networking, ); + // avoid synchronized timers. We pick a randomised timer between 500 and 1023 millis. + let mut status_update_timer = + tokio::time::interval(Duration::from_millis(500 + rand::random::() % 524)); + status_update_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut cancellation = std::pin::pin!(cancellation_watcher()); let partition_id_str: &'static str = Box::leak(Box::new(self.partition_id.to_string())); loop { tokio::select! { _ = &mut cancellation => break, + _command = self.control_rx.recv() => { + // todo: handle leadership change requests here + } + _ = status_update_timer.tick() => { + self.status_watch_tx.send_modify(|old| { + old.clone_from(&self.status); + old.updated_at = MillisSinceEpoch::now(); + }); + } record = log_reader.read_next() => { let command_start = Instant::now(); let record = record?; @@ -154,6 +190,7 @@ where let leadership_change = Self::apply_record( record, + &mut self.status, &mut state_machine, &mut transaction, &mut action_collector, @@ -164,6 +201,8 @@ where if let Some(announce_leader) = leadership_change { let new_esn = EpochSequenceNumber::new(announce_leader.leader_epoch); + self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch); + self.status.last_observed_leader_node = Some(announce_leader.node_id); // update our own epoch sequence number to filter out messages from previous leaders transaction.store_dedup_sequence_number(ProducerId::self_producer(), DedupSequenceNumber::Esn(new_esn)).await; // commit all changes so far, this is important so that the actuators see all changes @@ -177,6 +216,7 @@ where if announce_leader.node_id == metadata().my_node_id() { let was_follower = !state.is_leader(); (state, action_effect_stream) = state.become_leader(new_esn, &mut partition_storage).await?; + self.status.effective_mode = Some(RunMode::Leader); if was_follower { Span::current().record("is_leader", state.is_leader()); debug!(leader_epoch = %new_esn.leader_epoch, "Partition leadership acquired"); @@ -184,6 +224,7 @@ where } else { let was_leader = state.is_leader(); (state, action_effect_stream) = state.become_follower().await?; + self.status.effective_mode = Some(RunMode::Follower); if was_leader { Span::current().record("is_leader", state.is_leader()); debug!(leader_epoch = %new_esn.leader_epoch, "Partition leadership lost to {}", announce_leader.node_id); @@ -233,8 +274,10 @@ where Ok(state_machine) } + #[allow(clippy::too_many_arguments)] async fn apply_record( record: (Lsn, Envelope), + status: &mut PartitionProcessorStatus, state_machine: &mut StateMachine, transaction: &mut Transaction>, action_collector: &mut ActionCollector, @@ -248,6 +291,19 @@ where let (lsn, envelope) = record; transaction.store_applied_lsn(lsn).await?; + // Update replay status + status.last_applied_log_lsn = Some(record.0); + status.last_record_applied_at = Some(MillisSinceEpoch::now()); + match status.replay_status { + ReplayStatus::CatchingUp { + // finished catching up + target_tail_lsn, + } if record.0 >= target_tail_lsn => { + status.replay_status = ReplayStatus::Active; + } + _ => {} + }; + if let Some(dedup_information) = is_targeted_to_me(&envelope.header, partition_key_range) { // deduplicate if deduplication information has been provided if let Some(dedup_information) = dedup_information { @@ -257,14 +313,13 @@ where envelope.header ); return Ok(None); - } else { - transaction - .store_dedup_sequence_number( - dedup_information.producer_id.clone(), - dedup_information.sequence_number, - ) - .await; } + transaction + .store_dedup_sequence_number( + dedup_information.producer_id.clone(), + dedup_information.sequence_number, + ) + .await; } if let Command::AnnounceLeader(announce_leader) = envelope.command { @@ -306,6 +361,7 @@ where .await?; } } else { + status.skipped_records += 1; trace!( "Ignore message which is not targeted to me: {:?}", envelope.header diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 32893852d..2b0d2ccc9 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -8,15 +8,33 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::partition::storage::invoker::InvokerStorageReader; -use crate::PartitionProcessor; +use std::collections::BTreeMap; +use std::ops::RangeInclusive; +use std::time::Duration; + use anyhow::Context; +use futures::stream::BoxStream; +use futures::stream::StreamExt; +use restate_core::network::NetworkSender; +use restate_core::TaskCenter; +use restate_network::rpc_router::{RpcError, RpcRouter}; +use restate_node_protocol::partition_processor_manager::GetProcessorsState; +use restate_node_protocol::partition_processor_manager::ProcessorsStateResponse; +use restate_node_protocol::RpcMessage; +use restate_types::processors::{PartitionProcessorStatus, RunMode}; +use tokio::sync::{mpsc, watch}; +use tracing::{debug, info, warn}; + use restate_bifrost::Bifrost; +use restate_core::network::MessageRouterBuilder; use restate_core::worker_api::{ProcessorsManagerCommand, ProcessorsManagerHandle}; -use restate_core::{cancellation_watcher, task_center, Metadata, ShutdownError, TaskId, TaskKind}; +use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskId, TaskKind}; use restate_invoker_impl::InvokerHandle; use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError}; use restate_network::Networking; +use restate_node_protocol::cluster_controller::AttachRequest; +use restate_node_protocol::cluster_controller::{Action, AttachResponse}; +use restate_node_protocol::MessageEnvelope; use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_types::arc_util::ArcSwapExt; use restate_types::config::{UpdateableConfiguration, WorkerOptions}; @@ -24,50 +42,81 @@ use restate_types::epoch::EpochMetadata; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; use restate_types::logs::{LogId, Payload}; use restate_types::metadata_store::keys::partition_processor_epoch_key; -use restate_types::{GenerationalNodeId, Version}; +use restate_types::time::MillisSinceEpoch; +use restate_types::GenerationalNodeId; use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command as WalCommand, Destination, Envelope, Header, Source}; -use std::collections::HashMap; -use std::ops::RangeInclusive; -use tokio::sync::mpsc; -use tracing::{debug, info}; + +use crate::partition::storage::invoker::InvokerStorageReader; +use crate::partition::PartitionProcessorControlCommand; +use crate::PartitionProcessor; pub struct PartitionProcessorManager { + task_center: TaskCenter, updateable_config: UpdateableConfiguration, - running_partition_processors: HashMap, + running_partition_processors: BTreeMap, metadata: Metadata, metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, + attach_router: RpcRouter, + incoming_get_state: BoxStream<'static, MessageEnvelope>, networking: Networking, bifrost: Bifrost, invoker_handle: InvokerHandle>, rx: mpsc::Receiver, tx: mpsc::Sender, + latest_attach_response: Option<(GenerationalNodeId, AttachResponse)>, +} + +#[derive(Debug, thiserror::Error)] +enum AttachError { + #[error("No cluster controller found in nodes configuration")] + NoClusterController, + #[error(transparent)] + ShutdownError(#[from] ShutdownError), +} + +struct State { + _created_at: MillisSinceEpoch, + _key_range: RangeInclusive, + _control_tx: mpsc::Sender, + watch_rx: watch::Receiver, + _task_id: TaskId, } impl PartitionProcessorManager { + #[allow(clippy::too_many_arguments)] pub fn new( + task_center: TaskCenter, updateable_config: UpdateableConfiguration, metadata: Metadata, metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, + router_builder: &mut MessageRouterBuilder, networking: Networking, bifrost: Bifrost, invoker_handle: InvokerHandle>, ) -> Self { + let attach_router = RpcRouter::new(networking.clone(), router_builder); + let incoming_get_state = router_builder.subscribe_to_stream(2); + let (tx, rx) = mpsc::channel(updateable_config.load().worker.internal_queue_length()); Self { + task_center, updateable_config, - running_partition_processors: HashMap::default(), + running_partition_processors: BTreeMap::default(), metadata, metadata_store_client, partition_store_manager, + incoming_get_state, networking, bifrost, invoker_handle, + attach_router, rx, tx, + latest_attach_response: None, } } @@ -75,29 +124,71 @@ impl PartitionProcessorManager { ProcessorsManagerHandle::new(self.tx.clone()) } - pub async fn run(mut self) -> anyhow::Result<()> { - self.attach_worker().await?; - - // simulating a plan after initial attachement - let partition_table = self.metadata.wait_for_partition_table(Version::MIN).await?; - let plan = PartitionProcessorPlan::new( - partition_table.version(), - partition_table - .partitioner() - .map(|(partition_id, _)| (partition_id, Action::Start(Role::Leader))) - .collect(), - ); - self.apply_plan(plan).await?; + async fn attach(&mut self) -> Result, AttachError> { + loop { + // We try to get the admin node on every retry since it might change between retries. + let admin_node = self + .metadata + .nodes_config() + .get_admin_node() + .ok_or(AttachError::NoClusterController)? + .current_generation; + + info!( + "Attempting to attach to cluster controller '{}'", + admin_node + ); + if admin_node == self.metadata.my_node_id() { + // If this node is running the cluster controller, we need to wait a little to give cluster + // controller time to start up. This is only done to reduce the chances of observing + // connection errors in log. Such logs are benign since we retry, but it's still not nice + // to print, specially in a single-node setup. + info!( "This node is the cluster controller, giving cluster controller service 500ms to start"); + tokio::time::sleep(Duration::from_millis(500)).await; + } + match self + .attach_router + .call(admin_node.into(), &AttachRequest::default()) + .await + { + Ok(response) => return Ok(response), + Err(RpcError::Shutdown(e)) => return Err(AttachError::ShutdownError(e)), + Err(e) => { + warn!( + "Failed to send attach message to cluster controller: {}, retrying....", + e + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + } + + pub async fn run(mut self) -> anyhow::Result<()> { let shutdown = cancellation_watcher(); tokio::pin!(shutdown); + // Initial attach + let response = tokio::time::timeout(Duration::from_secs(5), self.attach()) + .await + .context("Timeout waiting to attach to a cluster controller")??; + + let (from, msg) = response.split(); + // We ignore errors due to shutdown + let _ = self.apply_plan(&msg.actions); + self.latest_attach_response = Some((from, msg)); + info!("Plan applied from attaching to controller {}", from); + loop { tokio::select! { Some(command) = self.rx.recv() => { - self.handle_command(command).await; + self.on_command(command); debug!("PartitionProcessorManager shutting down"); } + Some(get_state) = self.incoming_get_state.next() => { + self.on_get_state(get_state); + } _ = &mut shutdown => { return Ok(()); } @@ -105,7 +196,33 @@ impl PartitionProcessorManager { } } - async fn handle_command(&mut self, command: ProcessorsManagerCommand) { + fn on_get_state(&self, get_state_msg: MessageEnvelope) { + let (from, msg) = get_state_msg.split(); + // For all running partitions, collect state and send it back. + let state: BTreeMap = self + .running_partition_processors + .iter() + .map(|(partition_id, state)| { + let status = state.watch_rx.borrow().clone(); + (*partition_id, status) + }) + .collect(); + + let response = ProcessorsStateResponse { + request_id: msg.correlation_id(), + state, + }; + let networking = self.networking.clone(); + // ignore shutdown errors. + let _ = self.task_center.spawn( + restate_core::TaskKind::Disposable, + "get-processors-state-response", + None, + async move { Ok(networking.send(from.into(), &response).await?) }, + ); + } + + fn on_command(&mut self, command: ProcessorsManagerCommand) { use ProcessorsManagerCommand::*; match command { GetLivePartitions(sender) => { @@ -115,75 +232,77 @@ impl PartitionProcessorManager { } } - async fn attach_worker(&mut self) -> anyhow::Result<()> { - let admin_address = self - .metadata - .nodes_config() - .get_admin_node() - .expect("at least one admin node") - .address - .clone(); - - info!("Worker attaching to admin at '{admin_address}'"); - // todo: use Networking to attach to a cluster admin node. - Ok(()) - } - - #[allow(clippy::map_entry)] - pub async fn apply_plan(&mut self, plan: PartitionProcessorPlan) -> anyhow::Result<()> { + pub fn apply_plan(&mut self, actions: &[Action]) -> Result<(), ShutdownError> { let config = self.updateable_config.pinned(); let options = &config.worker; - let partition_table = self - .metadata - .wait_for_partition_table(plan.min_required_partition_table_version) - .await?; - for (partition_id, action) in plan.actions { + for action in actions { match action { - Action::Start(role) => { + Action::RunPartition(action) => { + #[allow(clippy::map_entry)] if !self .running_partition_processors - .contains_key(&partition_id) + .contains_key(&action.partition_id) { - let partition_range = partition_table - .partition_range(partition_id) - .expect("partition_range to be known"); - let task_id = self.spawn_partition_processor( + let (control_tx, control_rx) = mpsc::channel(2); + let status = PartitionProcessorStatus::new(action.mode); + let (watch_tx, watch_rx) = watch::channel(status.clone()); + + let _task_id = self.spawn_partition_processor( options, - partition_id, - partition_range, - role, + action.partition_id, + action.key_range_inclusive.clone().into(), + status, + control_rx, + watch_tx, )?; + let state = State { + _created_at: MillisSinceEpoch::now(), + _key_range: action.key_range_inclusive.clone().into(), + _task_id, + _control_tx: control_tx, + watch_rx, + }; self.running_partition_processors - .insert(partition_id, task_id); + .insert(action.partition_id, state); } else { debug!( "Partition processor for partition id '{}' is already running.", - partition_id + action.partition_id ); } } } } - Ok(()) } fn spawn_partition_processor( - &mut self, + &self, options: &WorkerOptions, partition_id: PartitionId, - partition_range: RangeInclusive, - role: Role, + key_range: RangeInclusive, + status: PartitionProcessorStatus, + control_rx: mpsc::Receiver, + watch_tx: watch::Sender, ) -> Result { - let processor = - self.create_partition_processor(options, partition_id, partition_range.clone()); + let planned_mode = status.planned_mode; + let processor = PartitionProcessor::new( + partition_id, + key_range.clone(), + status, + options.num_timers_in_memory_limit(), + options.internal_queue_length(), + control_rx, + watch_tx, + self.invoker_handle.clone(), + ); let networking = self.networking.clone(); let mut bifrost = self.bifrost.clone(); let metadata_store_client = self.metadata_store_client.clone(); let node_id = self.metadata.my_node_id(); - task_center().spawn_child( + self.task_center.spawn_child( TaskKind::PartitionProcessor, "partition-processor", Some(processor.partition_id), @@ -194,18 +313,18 @@ impl PartitionProcessorManager { let partition_store = storage_manager .open_partition_store( partition_id, - partition_range.clone(), + key_range.clone(), OpenMode::CreateIfMissing, &options.storage.rocksdb, ) .await?; - if role == Role::Leader { + if planned_mode == RunMode::Leader { Self::claim_leadership( &mut bifrost, metadata_store_client, partition_id, - partition_range, + key_range, node_id, ) .await?; @@ -217,21 +336,6 @@ impl PartitionProcessorManager { ) } - fn create_partition_processor( - &self, - options: &WorkerOptions, - partition_id: PartitionId, - partition_key_range: RangeInclusive, - ) -> PartitionProcessor { - PartitionProcessor::new( - partition_id, - partition_key_range, - options.num_timers_in_memory_limit(), - options.internal_queue_length(), - self.invoker_handle.clone(), - ) - } - async fn claim_leadership( bifrost: &mut Bifrost, metadata_store_client: MetadataStoreClient, @@ -303,33 +407,3 @@ impl PartitionProcessorManager { Ok(()) } } - -#[derive(Debug)] -pub struct PartitionProcessorPlan { - min_required_partition_table_version: Version, - actions: HashMap, -} - -impl PartitionProcessorPlan { - pub fn new( - min_required_partition_table_version: Version, - actions: HashMap, - ) -> Self { - Self { - min_required_partition_table_version, - actions, - } - } -} - -#[derive(Debug)] -pub enum Action { - Start(Role), -} - -#[derive(Debug, PartialEq)] -pub enum Role { - Leader, - #[allow(dead_code)] - Follower, -}