diff --git a/Cargo.lock b/Cargo.lock index 316904314..c2c155e95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5339,6 +5339,8 @@ dependencies = [ "futures", "restate-core", "restate-errors", + "restate-network", + "restate-node-protocol", "restate-types", "schemars", "serde", @@ -5779,9 +5781,11 @@ dependencies = [ "prost", "prost-build", "prost-types", + "rand", "restate-schema", "restate-types", "serde", + "serde_with", "strum 0.26.2", "strum_macros 0.26.2", "thiserror", diff --git a/crates/cluster-controller/Cargo.toml b/crates/cluster-controller/Cargo.toml index acc2dae8a..2bbac7cb5 100644 --- a/crates/cluster-controller/Cargo.toml +++ b/crates/cluster-controller/Cargo.toml @@ -14,6 +14,8 @@ options_schema = ["dep:schemars"] [dependencies] restate-core = { workspace = true } restate-errors = { workspace = true } +restate-network = { workspace = true } +restate-node-protocol = { workspace = true } restate-types = { workspace = true } anyhow = { workspace = true } diff --git a/crates/cluster-controller/src/service.rs b/crates/cluster-controller/src/service.rs index d5c287ec5..50b841d45 100644 --- a/crates/cluster-controller/src/service.rs +++ b/crates/cluster-controller/src/service.rs @@ -9,7 +9,20 @@ // by the Apache License, Version 2.0. use codederror::CodedError; -use restate_core::cancellation_watcher; +use futures::stream::BoxStream; +use futures::StreamExt; + +use restate_network::Networking; +use restate_node_protocol::cluster_controller::{ + Action, AttachRequest, AttachResponse, RunMode, RunPartition, +}; +use restate_node_protocol::common::{KeyRange, RequestId}; +use restate_types::partition_table::FixedPartitionTable; + +use restate_core::network::{MessageRouterBuilder, NetworkSender}; +use restate_core::{cancellation_watcher, task_center, Metadata, ShutdownError, TaskCenter}; +use restate_node_protocol::MessageEnvelope; +use restate_types::{GenerationalNodeId, Version}; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -18,10 +31,27 @@ pub enum Error { Error, } -#[derive(Debug, Default)] -pub struct Service {} +pub struct Service { + metadata: Metadata, + networking: Networking, + incoming_messages: BoxStream<'static, MessageEnvelope>, +} + +impl Service { + pub fn new( + metadata: Metadata, + networking: Networking, + router_builder: &mut MessageRouterBuilder, + ) -> Self { + let incoming_messages = router_builder.subscribe_to_stream(10); + Service { + metadata, + networking, + incoming_messages, + } + } +} -// todo: Replace with proper handle pub struct ClusterControllerHandle; impl Service { @@ -29,8 +59,69 @@ impl Service { ClusterControllerHandle } - pub async fn run(self) -> anyhow::Result<()> { - let _ = cancellation_watcher().await; + pub async fn run(mut self) -> anyhow::Result<()> { + // Make sure we have partition table before starting + let _ = self.metadata.wait_for_partition_table(Version::MIN).await?; + + let mut shutdown = std::pin::pin!(cancellation_watcher()); + let tc = task_center(); + loop { + tokio::select! { + Some(message) = self.incoming_messages.next() => { + let (from, message) = message.split(); + self.handle_attach_request(&tc, from, message).await?; + } + _ = &mut shutdown => { + return Ok(()); + } + } + } + } + + async fn handle_attach_request( + &mut self, + tc: &TaskCenter, + from: GenerationalNodeId, + request: AttachRequest, + ) -> Result<(), ShutdownError> { + let partition_table = self + .metadata + .partition_table() + .expect("partition table is loaded before run"); + let networking = self.networking.clone(); + let response = self.create_attachment_response(&partition_table, from, request.request_id); + tc.spawn( + restate_core::TaskKind::Disposable, + "attachment-response", + None, + async move { Ok(networking.send(from.into(), &response).await?) }, + )?; Ok(()) } + + fn create_attachment_response( + &self, + partition_table: &FixedPartitionTable, + _node: GenerationalNodeId, + request_id: RequestId, + ) -> AttachResponse { + // simulating a plan after initial attachement + let actions = partition_table + .partitioner() + .map(|(partition_id, key_range)| { + Action::RunPartition(RunPartition { + partition_id, + key_range_inclusive: KeyRange { + from: *key_range.start(), + to: *key_range.end(), + }, + mode: RunMode::Leader, + }) + }) + .collect(); + AttachResponse { + request_id, + actions, + } + } } diff --git a/crates/core/src/network/message_router.rs b/crates/core/src/network/message_router.rs index 69aa4da94..c1e48e95d 100644 --- a/crates/core/src/network/message_router.rs +++ b/crates/core/src/network/message_router.rs @@ -23,7 +23,7 @@ use restate_node_protocol::MessageEnvelope; use restate_types::GenerationalNodeId; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tracing::warn; +use tracing::{info, warn}; use crate::is_cancellation_requested; @@ -124,6 +124,7 @@ impl MessageRouterBuilder { if self.handlers.insert(target, Box::new(wrapped)).is_some() { panic!("Handler for target {} has been registered already!", target); } + info!("Attached to target {}", target); Box::pin(ReceiverStream::new(rx)) } diff --git a/crates/core/src/worker_api/partition_processor_manager.rs b/crates/core/src/worker_api/partition_processor_manager.rs index 5486de0a4..1e1de711a 100644 --- a/crates/core/src/worker_api/partition_processor_manager.rs +++ b/crates/core/src/worker_api/partition_processor_manager.rs @@ -8,9 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_types::identifiers::PartitionId; use tokio::sync::{mpsc, oneshot}; +use restate_node_protocol::cluster_controller::RunMode; +use restate_types::identifiers::{LeaderEpoch, PartitionId}; +use restate_types::logs::Lsn; +use restate_types::time::MillisSinceEpoch; +use restate_types::GenerationalNodeId; + use crate::ShutdownError; #[derive(Debug)] @@ -18,6 +23,42 @@ pub enum ProcessorsManagerCommand { GetLivePartitions(oneshot::Sender>), } +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum ReplayStatus { + Starting, + Active, + CatchingUp { target_tail_lsn: Lsn }, +} + +#[derive(Debug, Clone)] +pub struct PartitionProcessorStatus { + pub updated_at: MillisSinceEpoch, + pub planned_mode: RunMode, + pub effective_mode: Option, + pub last_observed_leader_epoch: Option, + pub last_observed_leader_node: Option, + pub last_applied_log_lsn: Option, + pub last_record_applied_at: Option, + pub skipped_records: u64, + pub replay_status: ReplayStatus, +} + +impl PartitionProcessorStatus { + pub fn new(planned_mode: RunMode) -> Self { + Self { + updated_at: MillisSinceEpoch::now(), + planned_mode, + effective_mode: None, + last_observed_leader_epoch: None, + last_observed_leader_node: None, + last_applied_log_lsn: None, + last_record_applied_at: None, + skipped_records: 0, + replay_status: ReplayStatus::Starting, + } + } +} + #[derive(Debug, Clone)] pub struct ProcessorsManagerHandle(mpsc::Sender); diff --git a/crates/node-protocol/Cargo.toml b/crates/node-protocol/Cargo.toml index 678720cac..059ebf580 100644 --- a/crates/node-protocol/Cargo.toml +++ b/crates/node-protocol/Cargo.toml @@ -22,7 +22,9 @@ enum-map = { workspace = true } flexbuffers = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +rand = { workspace = true } serde = { workspace = true} +serde_with = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } diff --git a/crates/node-protocol/proto/common.proto b/crates/node-protocol/proto/common.proto index 42ea2e1d0..755ca2acc 100644 --- a/crates/node-protocol/proto/common.proto +++ b/crates/node-protocol/proto/common.proto @@ -22,16 +22,17 @@ message NodeId { } // A generic type for versioned metadata -message Version { - uint32 value = 1; -} +message Version { uint32 value = 1; } -// The target service for a message +// The handle name or type tag of the message. For every target there must be +// exactly one message handler implementation. enum TargetName { TargetName_UNKNOWN = 0; METADATA_MANAGER = 1; INGRESS = 2; LOCAL_METADATA_STORE = 3; LOCAL_METADATA_STORE_CLIENT = 4; + ATTACH_REQUEST = 5; + ATTACH_RESPONSE = 6; + PARTITION_PROCESSOR_MANAGER_REQUESTS = 7; } - diff --git a/crates/node-protocol/src/cluster_controller.rs b/crates/node-protocol/src/cluster_controller.rs new file mode 100644 index 000000000..723afc513 --- /dev/null +++ b/crates/node-protocol/src/cluster_controller.rs @@ -0,0 +1,52 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_types::identifiers::PartitionId; +use serde::{Deserialize, Serialize}; + +use crate::common::{KeyRange, RequestId, TargetName}; +use crate::define_rpc; +use crate::CodecError; + +define_rpc! { + @request = AttachRequest, + @response = AttachResponse, + @request_target = TargetName::AttachRequest, + @response_target = TargetName::AttachResponse, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct AttachRequest { + pub request_id: RequestId, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AttachResponse { + pub request_id: RequestId, + pub actions: Vec, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)] +pub enum RunMode { + Leader, + Follower, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Action { + RunPartition(RunPartition), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RunPartition { + pub partition_id: PartitionId, + pub key_range_inclusive: KeyRange, + pub mode: RunMode, +} diff --git a/crates/node-protocol/src/common.rs b/crates/node-protocol/src/common.rs index da63dc83e..cfdbd6f1e 100644 --- a/crates/node-protocol/src/common.rs +++ b/crates/node-protocol/src/common.rs @@ -8,11 +8,44 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::ops::RangeInclusive; + +use rand::RngCore; +use restate_types::identifiers::PartitionKey; +use serde::{Deserialize, Serialize}; + include!(concat!(env!("OUT_DIR"), "/dev.restate.common.rs")); pub static MIN_SUPPORTED_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::Flexbuffers; pub static CURRENT_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::Flexbuffers; +/// Used to identify a request in a RPC-style call going through Networking. +#[derive( + Debug, + derive_more::Display, + PartialEq, + Eq, + Clone, + Copy, + Hash, + PartialOrd, + Ord, + serde::Serialize, + serde::Deserialize, +)] +pub struct RequestId(u64); +impl RequestId { + pub fn new() -> Self { + Default::default() + } +} + +impl Default for RequestId { + fn default() -> Self { + RequestId(rand::thread_rng().next_u64()) + } +} + pub const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/common_descriptor.bin")); @@ -69,3 +102,15 @@ impl From for NodeId { } } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KeyRange { + pub from: PartitionKey, + pub to: PartitionKey, +} + +impl From for RangeInclusive { + fn from(val: KeyRange) -> Self { + RangeInclusive::new(val.from, val.to) + } +} diff --git a/crates/node-protocol/src/lib.rs b/crates/node-protocol/src/lib.rs index 35c2c6026..a4bb4cbcb 100644 --- a/crates/node-protocol/src/lib.rs +++ b/crates/node-protocol/src/lib.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +pub mod cluster_controller; pub mod codec; pub mod common; mod error; @@ -50,6 +51,10 @@ impl MessageEnvelope { pub fn split(self) -> (GenerationalNodeId, M) { (self.peer, self.body) } + + pub fn body(&self) -> &M { + &self.body + } } impl MessageEnvelope { diff --git a/crates/node-services/proto/cluster_ctrl_svc.proto b/crates/node-services/proto/cluster_ctrl_svc.proto index 980527336..4f1b4bd59 100644 --- a/crates/node-services/proto/cluster_ctrl_svc.proto +++ b/crates/node-services/proto/cluster_ctrl_svc.proto @@ -14,13 +14,10 @@ import "common.proto"; package dev.restate.cluster_ctrl; service ClusterCtrlSvc { - // Attach worker at cluster controller - rpc AttachNode(AttachmentRequest) returns (AttachmentResponse); - + // Get identity information from this node. + rpc GetPartitionsStatus(PartitionsStatusRequest) returns (PartitionsStatusResponse); } -message AttachmentRequest { - optional dev.restate.common.NodeId node_id = 1; -} +message PartitionsStatusRequest { } -message AttachmentResponse {} +message PartitionsStatusResponse { } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 2c0dd1a11..1e493dc23 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -145,7 +145,10 @@ impl Node { let admin_role = if config.has_role(Role::Admin) { Some(AdminRole::new( updateable_config.clone(), + metadata.clone(), + networking.clone(), metadata_manager.writer(), + &mut router_builder, metadata_store_client.clone(), )?) } else { diff --git a/crates/node/src/network_server/handler/cluster_ctrl.rs b/crates/node/src/network_server/handler/cluster_ctrl.rs index 8be95b29b..bfb1e18c1 100644 --- a/crates/node/src/network_server/handler/cluster_ctrl.rs +++ b/crates/node/src/network_server/handler/cluster_ctrl.rs @@ -9,10 +9,9 @@ // by the Apache License, Version 2.0. use tonic::{async_trait, Request, Response, Status}; -use tracing::debug; use restate_node_services::cluster_ctrl::cluster_ctrl_svc_server::ClusterCtrlSvc; -use restate_node_services::cluster_ctrl::{AttachmentRequest, AttachmentResponse}; +use restate_node_services::cluster_ctrl::{PartitionsStatusRequest, PartitionsStatusResponse}; pub struct ClusterCtrlSvcHandler {} @@ -24,12 +23,10 @@ impl ClusterCtrlSvcHandler { #[async_trait] impl ClusterCtrlSvc for ClusterCtrlSvcHandler { - async fn attach_node( + async fn get_partitions_status( &self, - request: Request, - ) -> Result, Status> { - let node_id = request.into_inner().node_id.expect("node id must be set"); - debug!("Attaching node '{:?}'", node_id); - Ok(Response::new(AttachmentResponse {})) + _request: Request, + ) -> Result, Status> { + unimplemented!() } } diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index 5477e1b50..1f186ae36 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -10,7 +10,8 @@ use anyhow::Context; use codederror::CodedError; -use restate_types::arc_util::ArcSwapExt; +use restate_core::network::MessageRouterBuilder; +use restate_network::Networking; use std::time::Duration; use tonic::transport::Channel; @@ -18,10 +19,11 @@ use restate_admin::service::AdminService; use restate_bifrost::Bifrost; use restate_cluster_controller::ClusterControllerHandle; use restate_core::metadata_store::MetadataStoreClient; -use restate_core::{task_center, MetadataWriter, TaskKind}; +use restate_core::{task_center, Metadata, MetadataWriter, TaskKind}; use restate_node_services::node_svc::node_svc_client::NodeSvcClient; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_service_protocol::discovery::ServiceDiscovery; +use restate_types::arc_util::ArcSwapExt; use restate_types::config::{IngressOptions, UpdateableConfiguration}; use restate_types::retries::RetryPolicy; @@ -47,7 +49,10 @@ pub struct AdminRole { impl AdminRole { pub fn new( updateable_config: UpdateableConfiguration, + metadata: Metadata, + networking: Networking, metadata_writer: MetadataWriter, + router_builder: &mut MessageRouterBuilder, metadata_store_client: MetadataStoreClient, ) -> Result { let config = updateable_config.pinned(); @@ -70,9 +75,12 @@ impl AdminRole { service_discovery, ); + let controller = + restate_cluster_controller::Service::new(metadata, networking, router_builder); + Ok(AdminRole { updateable_config, - controller: restate_cluster_controller::Service::default(), + controller, admin, }) } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index bc96ebd89..e8005002c 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -159,6 +159,7 @@ impl Worker { metadata.clone(), metadata_store_client, partition_store_manager.clone(), + router_builder, networking, bifrost, invoker.handle(), diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 16dd73206..f03d811e2 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -20,13 +20,18 @@ use assert2::let_assert; use futures::StreamExt; use metrics::{counter, histogram}; use restate_core::metadata; +use restate_core::worker_api::{PartitionProcessorStatus, ReplayStatus}; use restate_network::Networking; +use restate_node_protocol::cluster_controller::RunMode; use restate_partition_store::{PartitionStore, RocksDBTransaction}; use restate_types::identifiers::{PartitionId, PartitionKey}; +use restate_types::time::MillisSinceEpoch; use std::fmt::Debug; use std::marker::PhantomData; use std::ops::RangeInclusive; -use std::time::Instant; +use std::time::{Duration, Instant}; +use tokio::sync::{mpsc, watch}; +use tokio::time::MissedTickBehavior; use tracing::{debug, instrument, trace, Span}; mod action_effect_handler; @@ -48,6 +53,9 @@ use restate_wal_protocol::{Command, Destination, Envelope, Header}; use self::storage::invoker::InvokerStorageReader; +/// Control messages from Manager to individual partition processor instances. +pub enum PartitionProcessorControlCommand {} + #[derive(Debug)] pub(super) struct PartitionProcessor { pub partition_id: PartitionId, @@ -56,7 +64,10 @@ pub(super) struct PartitionProcessor { num_timers_in_memory_limit: Option, channel_size: usize, + status: PartitionProcessorStatus, invoker_tx: InvokerInputSender, + control_rx: mpsc::Receiver, + status_watch_tx: watch::Sender, _entry_codec: PhantomData, } @@ -71,23 +82,29 @@ where pub(super) fn new( partition_id: PartitionId, partition_key_range: RangeInclusive, + status: PartitionProcessorStatus, num_timers_in_memory_limit: Option, channel_size: usize, + control_rx: mpsc::Receiver, + status_watch_tx: watch::Sender, invoker_tx: InvokerInputSender, ) -> Self { Self { partition_id, partition_key_range, + status, num_timers_in_memory_limit, channel_size, invoker_tx, + control_rx, + status_watch_tx, _entry_codec: Default::default(), } } #[instrument(level = "info", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))] pub(super) async fn run( - self, + mut self, networking: Networking, bifrost: Bifrost, partition_store: PartitionStore, @@ -112,15 +129,22 @@ where let last_applied_lsn = partition_storage.load_applied_lsn().await?; let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID); - if tracing::event_enabled!(tracing::Level::DEBUG) { - let current_tail = bifrost - .find_tail(LogId::from(partition_id), FindTailAttributes::default()) - .await?; - debug!( - last_applied_lsn = %last_applied_lsn, - current_log_tail = ?current_tail, - "PartitionProcessor creating log reader", - ); + self.status.last_applied_log_lsn = Some(last_applied_lsn); + let current_tail = bifrost + .find_tail(LogId::from(partition_id), FindTailAttributes::default()) + .await?; + debug!( + last_applied_lsn = %last_applied_lsn, + current_log_tail = ?current_tail, + "PartitionProcessor creating log reader", + ); + if current_tail.is_none() || current_tail.is_some_and(|tail| tail == last_applied_lsn) { + self.status.replay_status = ReplayStatus::Active; + } else { + // catching up. + self.status.replay_status = ReplayStatus::CatchingUp { + target_tail_lsn: current_tail.unwrap(), + } } let mut log_reader = LogReader::new(&bifrost, LogId::from(partition_id), last_applied_lsn); @@ -136,6 +160,8 @@ where bifrost, networking, ); + let mut status_update_timer = tokio::time::interval(Duration::from_millis(23)); + status_update_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut cancellation = std::pin::pin!(cancellation_watcher()); let partition_id_str: &'static str = Box::leak(Box::new(self.partition_id.to_string())); @@ -143,6 +169,15 @@ where let iteration_start = Instant::now(); tokio::select! { _ = &mut cancellation => break, + _command = self.control_rx.recv() => { + // todo: handle leadership change requests here + } + _ = status_update_timer.tick() => { + self.status_watch_tx.send_modify(|old| { + old.clone_from(&self.status); + old.updated_at = MillisSinceEpoch::now(); + }); + } record = log_reader.read_next() => { let command_start = Instant::now(); histogram!(PP_WAIT_OR_IDLE_DURATION, PARTITION_LABEL => partition_id_str).record(iteration_start.elapsed()); @@ -157,6 +192,7 @@ where let leadership_change = Self::apply_record( record, + &mut self.status, &mut state_machine, &mut transaction, &mut action_collector, @@ -167,6 +203,8 @@ where if let Some(announce_leader) = leadership_change { let new_esn = EpochSequenceNumber::new(announce_leader.leader_epoch); + self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch); + self.status.last_observed_leader_node = Some(announce_leader.node_id); // update our own epoch sequence number to filter out messages from previous leaders transaction.store_dedup_sequence_number(ProducerId::self_producer(), DedupSequenceNumber::Esn(new_esn)).await; // commit all changes so far, this is important so that the actuators see all changes @@ -180,6 +218,7 @@ where if announce_leader.node_id == metadata().my_node_id() { let was_follower = !state.is_leader(); (state, action_effect_stream) = state.become_leader(new_esn, &mut partition_storage).await?; + self.status.effective_mode = Some(RunMode::Leader); if was_follower { Span::current().record("is_leader", state.is_leader()); debug!(leader_epoch = %new_esn.leader_epoch, "Partition leadership acquired"); @@ -187,6 +226,7 @@ where } else { let was_leader = state.is_leader(); (state, action_effect_stream) = state.become_follower().await?; + self.status.effective_mode = Some(RunMode::Follower); if was_leader { Span::current().record("is_leader", state.is_leader()); debug!(leader_epoch = %new_esn.leader_epoch, "Partition leadership lost to {}", announce_leader.node_id); @@ -239,8 +279,10 @@ where Ok(state_machine) } + #[allow(clippy::too_many_arguments)] async fn apply_record( record: (Lsn, Envelope), + status: &mut PartitionProcessorStatus, state_machine: &mut StateMachine, transaction: &mut Transaction>, action_collector: &mut ActionCollector, @@ -254,6 +296,19 @@ where let (lsn, envelope) = record; transaction.store_applied_lsn(lsn).await?; + // Update replay status + status.last_applied_log_lsn = Some(record.0); + status.last_record_applied_at = Some(MillisSinceEpoch::now()); + match status.replay_status { + ReplayStatus::CatchingUp { + // finished catching up + target_tail_lsn, + } if record.0 >= target_tail_lsn => { + status.replay_status = ReplayStatus::Active; + } + _ => {} + }; + if let Some(dedup_information) = is_targeted_to_me(&envelope.header, partition_key_range) { // deduplicate if deduplication information has been provided if let Some(dedup_information) = dedup_information { @@ -263,14 +318,13 @@ where envelope.header ); return Ok(None); - } else { - transaction - .store_dedup_sequence_number( - dedup_information.producer_id.clone(), - dedup_information.sequence_number, - ) - .await; } + transaction + .store_dedup_sequence_number( + dedup_information.producer_id.clone(), + dedup_information.sequence_number, + ) + .await; } if let Command::AnnounceLeader(announce_leader) = envelope.command { @@ -312,6 +366,7 @@ where .await?; } } else { + status.skipped_records += 1; trace!( "Ignore message which is not targeted to me: {:?}", envelope.header diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 32893852d..b656c0451 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -8,15 +8,27 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::partition::storage::invoker::InvokerStorageReader; -use crate::PartitionProcessor; +use std::collections::BTreeMap; +use std::ops::RangeInclusive; +use std::time::Duration; + use anyhow::Context; +use restate_network::rpc_router::{RpcError, RpcRouter}; +use tokio::sync::{mpsc, watch}; +use tracing::{debug, info, warn}; + use restate_bifrost::Bifrost; -use restate_core::worker_api::{ProcessorsManagerCommand, ProcessorsManagerHandle}; +use restate_core::network::MessageRouterBuilder; +use restate_core::worker_api::{ + PartitionProcessorStatus, ProcessorsManagerCommand, ProcessorsManagerHandle, +}; use restate_core::{cancellation_watcher, task_center, Metadata, ShutdownError, TaskId, TaskKind}; use restate_invoker_impl::InvokerHandle; use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError}; use restate_network::Networking; +use restate_node_protocol::cluster_controller::AttachRequest; +use restate_node_protocol::cluster_controller::{Action, AttachResponse, RunMode}; +use restate_node_protocol::MessageEnvelope; use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_types::arc_util::ArcSwapExt; use restate_types::config::{UpdateableConfiguration, WorkerOptions}; @@ -24,50 +36,75 @@ use restate_types::epoch::EpochMetadata; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; use restate_types::logs::{LogId, Payload}; use restate_types::metadata_store::keys::partition_processor_epoch_key; -use restate_types::{GenerationalNodeId, Version}; +use restate_types::time::MillisSinceEpoch; +use restate_types::GenerationalNodeId; use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command as WalCommand, Destination, Envelope, Header, Source}; -use std::collections::HashMap; -use std::ops::RangeInclusive; -use tokio::sync::mpsc; -use tracing::{debug, info}; + +use crate::partition::storage::invoker::InvokerStorageReader; +use crate::partition::PartitionProcessorControlCommand; +use crate::PartitionProcessor; pub struct PartitionProcessorManager { updateable_config: UpdateableConfiguration, - running_partition_processors: HashMap, + running_partition_processors: BTreeMap, metadata: Metadata, metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, + attach_router: RpcRouter, networking: Networking, bifrost: Bifrost, invoker_handle: InvokerHandle>, rx: mpsc::Receiver, tx: mpsc::Sender, + latest_attach_response: Option<(GenerationalNodeId, AttachResponse)>, +} + +#[derive(Debug, thiserror::Error)] +enum AttachError { + #[error("No cluster controller found in nodes configuration")] + NoClusterController, + #[error(transparent)] + ShutdownError(#[from] ShutdownError), +} + +struct State { + _created_at: MillisSinceEpoch, + _key_range: RangeInclusive, + _control_tx: mpsc::Sender, + _watch_rx: watch::Receiver, + _task_id: TaskId, } impl PartitionProcessorManager { + #[allow(clippy::too_many_arguments)] pub fn new( updateable_config: UpdateableConfiguration, metadata: Metadata, metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, + router_builder: &mut MessageRouterBuilder, networking: Networking, bifrost: Bifrost, invoker_handle: InvokerHandle>, ) -> Self { + let attach_router = RpcRouter::new(networking.clone(), router_builder); + let (tx, rx) = mpsc::channel(updateable_config.load().worker.internal_queue_length()); Self { updateable_config, - running_partition_processors: HashMap::default(), + running_partition_processors: BTreeMap::default(), metadata, metadata_store_client, partition_store_manager, networking, bifrost, invoker_handle, + attach_router, rx, tx, + latest_attach_response: None, } } @@ -75,27 +112,66 @@ impl PartitionProcessorManager { ProcessorsManagerHandle::new(self.tx.clone()) } - pub async fn run(mut self) -> anyhow::Result<()> { - self.attach_worker().await?; - - // simulating a plan after initial attachement - let partition_table = self.metadata.wait_for_partition_table(Version::MIN).await?; - let plan = PartitionProcessorPlan::new( - partition_table.version(), - partition_table - .partitioner() - .map(|(partition_id, _)| (partition_id, Action::Start(Role::Leader))) - .collect(), - ); - self.apply_plan(plan).await?; + async fn attach(&mut self) -> Result, AttachError> { + loop { + // We try to get the admin node on every retry since it might change between retries. + let admin_node = self + .metadata + .nodes_config() + .get_admin_node() + .ok_or(AttachError::NoClusterController)? + .current_generation; + + info!( + "Attempting to attach to cluster controller '{}'", + admin_node + ); + if admin_node == self.metadata.my_node_id() { + // If this node is running the cluster controller, we need to wait a little to give cluster + // controller time to start up. This is only done to reduce the chances of observing + // connection errors in log. Such logs are benign since we retry, but it's still not nice + // to print, specially in a single-node setup. + info!( "This node is the cluster controller, giving cluster controller service 500ms to start"); + tokio::time::sleep(Duration::from_millis(500)).await; + } + + match self + .attach_router + .call(admin_node.into(), &AttachRequest::default()) + .await + { + Ok(response) => return Ok(response), + Err(RpcError::Shutdown(e)) => return Err(AttachError::ShutdownError(e)), + Err(e) => { + warn!( + "Failed to send attach message to cluster controller: {}, retrying....", + e + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + } + pub async fn run(mut self) -> anyhow::Result<()> { let shutdown = cancellation_watcher(); tokio::pin!(shutdown); + // Initial attach + let response = tokio::time::timeout(Duration::from_secs(5), self.attach()) + .await + .context("Timeout waiting to attach to a cluster controller")??; + + let (from, msg) = response.split(); + // We ignore errors due to shutdown + let _ = self.apply_plan(&msg.actions); + self.latest_attach_response = Some((from, msg.clone())); + info!("Plan applied from attaching to controller {}", from); + loop { tokio::select! { Some(command) = self.rx.recv() => { - self.handle_command(command).await; + self.on_command(command); debug!("PartitionProcessorManager shutting down"); } _ = &mut shutdown => { @@ -105,7 +181,7 @@ impl PartitionProcessorManager { } } - async fn handle_command(&mut self, command: ProcessorsManagerCommand) { + fn on_command(&mut self, command: ProcessorsManagerCommand) { use ProcessorsManagerCommand::*; match command { GetLivePartitions(sender) => { @@ -115,69 +191,71 @@ impl PartitionProcessorManager { } } - async fn attach_worker(&mut self) -> anyhow::Result<()> { - let admin_address = self - .metadata - .nodes_config() - .get_admin_node() - .expect("at least one admin node") - .address - .clone(); - - info!("Worker attaching to admin at '{admin_address}'"); - // todo: use Networking to attach to a cluster admin node. - Ok(()) - } - - #[allow(clippy::map_entry)] - pub async fn apply_plan(&mut self, plan: PartitionProcessorPlan) -> anyhow::Result<()> { + pub fn apply_plan(&mut self, actions: &[Action]) -> Result<(), ShutdownError> { let config = self.updateable_config.pinned(); let options = &config.worker; - let partition_table = self - .metadata - .wait_for_partition_table(plan.min_required_partition_table_version) - .await?; - for (partition_id, action) in plan.actions { + for action in actions { match action { - Action::Start(role) => { + Action::RunPartition(action) => { + #[allow(clippy::map_entry)] if !self .running_partition_processors - .contains_key(&partition_id) + .contains_key(&action.partition_id) { - let partition_range = partition_table - .partition_range(partition_id) - .expect("partition_range to be known"); - let task_id = self.spawn_partition_processor( + let (control_tx, control_rx) = mpsc::channel(2); + let status = PartitionProcessorStatus::new(action.mode); + let (watch_tx, watch_rx) = watch::channel(status.clone()); + + let _task_id = self.spawn_partition_processor( options, - partition_id, - partition_range, - role, + action.partition_id, + action.key_range_inclusive.clone().into(), + status, + control_rx, + watch_tx, )?; + let state = State { + _created_at: MillisSinceEpoch::now(), + _key_range: action.key_range_inclusive.clone().into(), + _task_id, + _control_tx: control_tx, + _watch_rx: watch_rx, + }; self.running_partition_processors - .insert(partition_id, task_id); + .insert(action.partition_id, state); } else { debug!( "Partition processor for partition id '{}' is already running.", - partition_id + action.partition_id ); } } } } - Ok(()) } fn spawn_partition_processor( - &mut self, + &self, options: &WorkerOptions, partition_id: PartitionId, - partition_range: RangeInclusive, - role: Role, + key_range: RangeInclusive, + status: PartitionProcessorStatus, + control_rx: mpsc::Receiver, + watch_tx: watch::Sender, ) -> Result { - let processor = - self.create_partition_processor(options, partition_id, partition_range.clone()); + let planned_mode = status.planned_mode; + let processor = PartitionProcessor::new( + partition_id, + key_range.clone(), + status, + options.num_timers_in_memory_limit(), + options.internal_queue_length(), + control_rx, + watch_tx, + self.invoker_handle.clone(), + ); let networking = self.networking.clone(); let mut bifrost = self.bifrost.clone(); let metadata_store_client = self.metadata_store_client.clone(); @@ -194,18 +272,18 @@ impl PartitionProcessorManager { let partition_store = storage_manager .open_partition_store( partition_id, - partition_range.clone(), + key_range.clone(), OpenMode::CreateIfMissing, &options.storage.rocksdb, ) .await?; - if role == Role::Leader { + if planned_mode == RunMode::Leader { Self::claim_leadership( &mut bifrost, metadata_store_client, partition_id, - partition_range, + key_range, node_id, ) .await?; @@ -217,21 +295,6 @@ impl PartitionProcessorManager { ) } - fn create_partition_processor( - &self, - options: &WorkerOptions, - partition_id: PartitionId, - partition_key_range: RangeInclusive, - ) -> PartitionProcessor { - PartitionProcessor::new( - partition_id, - partition_key_range, - options.num_timers_in_memory_limit(), - options.internal_queue_length(), - self.invoker_handle.clone(), - ) - } - async fn claim_leadership( bifrost: &mut Bifrost, metadata_store_client: MetadataStoreClient, @@ -303,33 +366,3 @@ impl PartitionProcessorManager { Ok(()) } } - -#[derive(Debug)] -pub struct PartitionProcessorPlan { - min_required_partition_table_version: Version, - actions: HashMap, -} - -impl PartitionProcessorPlan { - pub fn new( - min_required_partition_table_version: Version, - actions: HashMap, - ) -> Self { - Self { - min_required_partition_table_version, - actions, - } - } -} - -#[derive(Debug)] -pub enum Action { - Start(Role), -} - -#[derive(Debug, PartialEq)] -pub enum Role { - Leader, - #[allow(dead_code)] - Follower, -}