From 05eb4ba33f0b5467395ae098f5d7dcfd4b55b0ce Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 29 Apr 2024 09:23:05 +0100 Subject: [PATCH] Partition-scoped storage reader in invoker This changes the invoker to allow partitions to pass down scoped storage readers along registration messages in invoker. This means that the invoker uses the partition-supplied storage reader for invocations owned by this particular partition. Additionally, this attempts to consolidate (in most places) the journal reader and state reader under a single generic type. --- crates/invoker-api/src/handle.rs | 3 +- crates/invoker-api/src/journal_reader.rs | 25 --- crates/invoker-api/src/lib.rs | 41 +++++ crates/invoker-api/src/state_reader.rs | 24 --- crates/invoker-impl/src/input_command.rs | 12 +- crates/invoker-impl/src/invocation_task.rs | 12 +- crates/invoker-impl/src/lib.rs | 150 +++++++++++------- .../invoker-impl/src/state_machine_manager.rs | 37 ++++- crates/worker/src/lib.rs | 12 +- crates/worker/src/partition/leadership/mod.rs | 12 +- crates/worker/src/partition/mod.rs | 5 +- crates/worker/src/partition/storage/mod.rs | 5 + .../worker/src/partition_processor_manager.rs | 7 +- 13 files changed, 203 insertions(+), 142 deletions(-) diff --git a/crates/invoker-api/src/handle.rs b/crates/invoker-api/src/handle.rs index 9b551cca3..0be8bc309 100644 --- a/crates/invoker-api/src/handle.rs +++ b/crates/invoker-api/src/handle.rs @@ -28,7 +28,7 @@ pub enum InvokeInputJournal { CachedJournal(JournalMetadata, Vec), } -pub trait ServiceHandle { +pub trait ServiceHandle { type Future: Future>; fn invoke( @@ -65,6 +65,7 @@ pub trait ServiceHandle { &mut self, partition: PartitionLeaderEpoch, partition_key_range: RangeInclusive, + storage_reader: SR, sender: mpsc::Sender, ) -> Self::Future; } diff --git a/crates/invoker-api/src/journal_reader.rs b/crates/invoker-api/src/journal_reader.rs index 478570294..a98f7ccfb 100644 --- a/crates/invoker-api/src/journal_reader.rs +++ b/crates/invoker-api/src/journal_reader.rs @@ -46,28 +46,3 @@ pub trait JournalReader { fid: &'a InvocationId, ) -> impl Future> + Send; } - -#[cfg(any(test, feature = "mocks"))] -pub mod mocks { - use super::*; - use restate_types::invocation::ServiceInvocationSpanContext; - use std::convert::Infallible; - - #[derive(Debug, Clone)] - pub struct EmptyJournalReader; - - impl JournalReader for EmptyJournalReader { - type JournalStream = futures::stream::Empty; - type Error = Infallible; - - async fn read_journal<'a>( - &'a mut self, - _sid: &'a InvocationId, - ) -> Result<(JournalMetadata, Self::JournalStream), Self::Error> { - Ok(( - JournalMetadata::new(0, ServiceInvocationSpanContext::empty(), None), - futures::stream::empty(), - )) - } - } -} diff --git a/crates/invoker-api/src/lib.rs b/crates/invoker-api/src/lib.rs index 23c61cfbb..00aecdf47 100644 --- a/crates/invoker-api/src/lib.rs +++ b/crates/invoker-api/src/lib.rs @@ -21,3 +21,44 @@ pub use handle::*; pub use journal_reader::{JournalMetadata, JournalReader}; pub use state_reader::{EagerState, StateReader}; pub use status_handle::{InvocationErrorReport, InvocationStatusReport, StatusHandle}; + +#[cfg(any(test, feature = "mocks"))] +pub mod mocks { + use super::*; + use bytes::Bytes; + use restate_types::identifiers::{InvocationId, ServiceId}; + use restate_types::invocation::ServiceInvocationSpanContext; + use restate_types::journal::raw::PlainRawEntry; + use std::convert::Infallible; + use std::iter::empty; + + #[derive(Debug, Clone)] + pub struct EmptyStorageReader; + + impl JournalReader for EmptyStorageReader { + type JournalStream = futures::stream::Empty; + type Error = Infallible; + + async fn read_journal<'a>( + &'a mut self, + _sid: &'a InvocationId, + ) -> Result<(JournalMetadata, Self::JournalStream), Self::Error> { + Ok(( + JournalMetadata::new(0, ServiceInvocationSpanContext::empty(), None), + futures::stream::empty(), + )) + } + } + + impl StateReader for EmptyStorageReader { + type StateIter = std::iter::Empty<(Bytes, Bytes)>; + type Error = Infallible; + + async fn read_state<'a>( + &'a mut self, + _service_id: &'a ServiceId, + ) -> Result, Self::Error> { + Ok(EagerState::new_complete(empty())) + } + } +} diff --git a/crates/invoker-api/src/state_reader.rs b/crates/invoker-api/src/state_reader.rs index 2341a8600..e3a57affe 100644 --- a/crates/invoker-api/src/state_reader.rs +++ b/crates/invoker-api/src/state_reader.rs @@ -75,27 +75,3 @@ pub trait StateReader { service_id: &'a ServiceId, ) -> impl Future, Self::Error>> + Send; } - -#[cfg(any(test, feature = "mocks"))] -pub mod mocks { - use crate::{EagerState, StateReader}; - use bytes::Bytes; - use restate_types::identifiers::ServiceId; - use std::convert::Infallible; - use std::iter::empty; - - #[derive(Debug, Clone)] - pub struct EmptyStateReader; - - impl StateReader for EmptyStateReader { - type StateIter = std::iter::Empty<(Bytes, Bytes)>; - type Error = Infallible; - - async fn read_state<'a>( - &'a mut self, - _service_id: &'a ServiceId, - ) -> Result, Self::Error> { - Ok(EagerState::new_complete(empty())) - } - } -} diff --git a/crates/invoker-impl/src/input_command.rs b/crates/invoker-impl/src/input_command.rs index 7758be183..fd840e68c 100644 --- a/crates/invoker-impl/src/input_command.rs +++ b/crates/invoker-impl/src/input_command.rs @@ -30,7 +30,7 @@ pub(crate) struct InvokeCommand { } #[derive(Debug)] -pub(crate) enum InputCommand { +pub(crate) enum InputCommand { Invoke(InvokeCommand), Completion { partition: PartitionLeaderEpoch, @@ -58,6 +58,7 @@ pub(crate) enum InputCommand { RegisterPartition { partition: PartitionLeaderEpoch, partition_key_range: RangeInclusive, + storage_reader: SR, sender: mpsc::Sender, }, } @@ -65,13 +66,12 @@ pub(crate) enum InputCommand { // -- Handles implementations. This is just glue code between the Input and the interfaces #[derive(Debug, Clone)] -pub struct ChannelServiceHandle { - pub(super) input: mpsc::UnboundedSender, +pub struct InvokerHandle { + pub(super) input: mpsc::UnboundedSender>, } -impl ServiceHandle for ChannelServiceHandle { +impl ServiceHandle for InvokerHandle { type Future = futures::future::Ready>; - fn invoke( &mut self, partition: PartitionLeaderEpoch, @@ -152,6 +152,7 @@ impl ServiceHandle for ChannelServiceHandle { &mut self, partition: PartitionLeaderEpoch, partition_key_range: RangeInclusive, + storage_reader: SR, sender: mpsc::Sender, ) -> Self::Future { futures::future::ready( @@ -160,6 +161,7 @@ impl ServiceHandle for ChannelServiceHandle { partition, partition_key_range, sender, + storage_reader, }) .map_err(|_| NotRunningError), ) diff --git a/crates/invoker-impl/src/invocation_task.rs b/crates/invoker-impl/src/invocation_task.rs index 30d4a55d6..db727deae 100644 --- a/crates/invoker-impl/src/invocation_task.rs +++ b/crates/invoker-impl/src/invocation_task.rs @@ -240,7 +240,7 @@ impl From for InvocationTaskOutputInner { } /// Represents an open invocation stream -pub(super) struct InvocationTask { +pub(super) struct InvocationTask { // Shared client client: ServiceClient, @@ -253,8 +253,8 @@ pub(super) struct InvocationTask { disable_eager_state: bool, // Invoker tx/rx - journal_reader: JR, state_reader: SR, + journal_reader: JR, entry_enricher: EE, deployment_metadata_resolver: DMR, invoker_tx: mpsc::UnboundedSender, @@ -297,11 +297,11 @@ macro_rules! shortcircuit { }; } -impl InvocationTask +impl InvocationTask where + SR: StateReader + StateReader + Clone + Send + Sync + 'static, JR: JournalReader + Clone + Send + Sync + 'static, ::JournalStream: Unpin + Send + 'static, - SR: StateReader + Clone + Send + Sync + 'static, ::StateIter: Send, EE: EntryEnricher, DMR: DeploymentResolver, @@ -318,8 +318,8 @@ where disable_eager_state: bool, message_size_warning: usize, message_size_limit: Option, - journal_reader: JR, state_reader: SR, + journal_reader: JR, entry_enricher: EE, deployment_metadata_resolver: DMR, invoker_tx: mpsc::UnboundedSender, @@ -334,8 +334,8 @@ where abort_timeout, disable_eager_state, next_journal_index: 0, - journal_reader, state_reader, + journal_reader, entry_enricher, deployment_metadata_resolver, invoker_tx, diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index ba853ec36..5f890070f 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -53,8 +53,8 @@ use tracing::instrument; use tracing::{debug, trace}; use crate::invocation_task::InvocationTaskError; -pub use input_command::ChannelServiceHandle; pub use input_command::ChannelStatusReader; +pub use input_command::InvokerHandle; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_service_protocol::RESTATE_SERVICE_PROTOCOL_VERSION; use restate_types::invocation::InvocationTarget; @@ -72,7 +72,7 @@ pub(crate) enum Notification { // -- InvocationTask factory: we use this to mock the state machine in tests -trait InvocationTaskRunner { +trait InvocationTaskRunner { #[allow(clippy::too_many_arguments)] fn start_invocation_task( &self, @@ -80,6 +80,7 @@ trait InvocationTaskRunner { partition: PartitionLeaderEpoch, invocation_id: InvocationId, invocation_target: InvocationTarget, + storage_reader: SR, invoker_tx: mpsc::UnboundedSender, invoker_rx: mpsc::UnboundedReceiver, input_journal: InvokeInputJournal, @@ -88,19 +89,16 @@ trait InvocationTaskRunner { } #[derive(Debug)] -struct DefaultInvocationTaskRunner { +struct DefaultInvocationTaskRunner { client: ServiceClient, - journal_reader: JR, - state_reader: SR, entry_enricher: EE, deployment_metadata_resolver: DMR, } -impl InvocationTaskRunner for DefaultInvocationTaskRunner +impl InvocationTaskRunner for DefaultInvocationTaskRunner where - JR: JournalReader + Clone + Send + Sync + 'static, - ::JournalStream: Unpin + Send + 'static, - SR: StateReader + Clone + Send + Sync + 'static, + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, ::StateIter: Send, EE: EntryEnricher + Clone + Send + 'static, DMR: DeploymentResolver + Clone + Send + 'static, @@ -111,6 +109,7 @@ where partition: PartitionLeaderEpoch, invocation_id: InvocationId, invocation_target: InvocationTarget, + storage_reader: SR, invoker_tx: mpsc::UnboundedSender, invoker_rx: mpsc::UnboundedReceiver, input_journal: InvokeInputJournal, @@ -128,8 +127,8 @@ where opts.disable_eager_state, opts.message_size_warning.get(), opts.message_size_limit(), - self.journal_reader.clone(), - self.state_reader.clone(), + storage_reader.clone(), + storage_reader, self.entry_enricher.clone(), self.deployment_metadata_resolver.clone(), invoker_tx, @@ -143,9 +142,9 @@ where // -- Service implementation #[derive(Debug)] -pub struct Service { +pub struct Service { // Used for constructing the invoker sender and status reader - input_tx: mpsc::UnboundedSender, + input_tx: mpsc::UnboundedSender>, status_tx: mpsc::UnboundedSender< restate_futures_util::command::Command< RangeInclusive, @@ -156,21 +155,23 @@ pub struct Service, - >, + inner: ServiceInner, SR>, } -impl Service { +impl Service { #[allow(clippy::too_many_arguments)] - pub(crate) fn new( + pub(crate) fn new( options: &InvokerOptions, deployment_metadata_resolver: DMR, client: ServiceClient, - journal_reader: JR, - state_reader: SR, entry_enricher: EE, - ) -> Service { + ) -> Service + where + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + JS: Stream + Unpin + Send + 'static, + EE: EntryEnricher, + DMR: DeploymentResolver, + { let (input_tx, input_rx) = mpsc::unbounded_channel(); let (status_tx, status_rx) = mpsc::unbounded_channel(); let (invocation_tasks_tx, invocation_tasks_rx) = mpsc::unbounded_channel(); @@ -186,8 +187,6 @@ impl Service { invocation_tasks_rx, invocation_task_runner: DefaultInvocationTaskRunner { client, - journal_reader, - state_reader, entry_enricher, deployment_metadata_resolver, }, @@ -203,13 +202,11 @@ impl Service { pub fn from_options( service_client_options: &ServiceClientOptions, invoker_options: &InvokerOptions, - journal_reader: JR, - state_reader: SR, entry_enricher: EE, deployment_registry: DMR, - ) -> Result, BuildError> + ) -> Result, BuildError> where - JR: JournalReader + Clone + Send + Sync + 'static, + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, JS: Stream + Unpin + Send + 'static, EE: EntryEnricher, DMR: DeploymentResolver, @@ -222,8 +219,6 @@ impl Service { invoker_options, deployment_registry, client, - journal_reader, - state_reader, entry_enricher, )) } @@ -235,17 +230,16 @@ pub enum BuildError { ServiceClient(#[from] restate_service_client::BuildError), } -impl Service +impl Service where - JR: JournalReader + Clone + Send + Sync + 'static, - ::JournalStream: Unpin + Send + 'static, - SR: StateReader + Clone + Send + Sync + 'static, + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, ::StateIter: Send, EE: EntryEnricher + Clone + Send + 'static, EMR: DeploymentResolver + Clone + Send + 'static, { - pub fn handle(&self) -> ChannelServiceHandle { - ChannelServiceHandle { + pub fn handle(&self) -> InvokerHandle { + InvokerHandle { input: self.input_tx.clone(), } } @@ -289,8 +283,8 @@ where } #[derive(Debug)] -struct ServiceInner { - input_rx: mpsc::UnboundedReceiver, +struct ServiceInner { + input_rx: mpsc::UnboundedReceiver>, status_rx: mpsc::UnboundedReceiver< restate_futures_util::command::Command< RangeInclusive, @@ -310,12 +304,15 @@ struct ServiceInner { retry_timers: TimerQueue<(PartitionLeaderEpoch, InvocationId)>, quota: quota::InvokerConcurrencyQuota, status_store: InvocationStatusStore, - invocation_state_machine_manager: state_machine_manager::InvocationStateMachineManager, + invocation_state_machine_manager: state_machine_manager::InvocationStateMachineManager, } -impl ServiceInner +impl ServiceInner where - ITR: InvocationTaskRunner, + ITR: InvocationTaskRunner, + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, + ::StateIter: Send, { // Returns true if we should execute another step, false if we should stop executing steps async fn step( @@ -347,8 +344,9 @@ where segmented_input_queue.enqueue(invoke_command).await; }, // --- Other commands (they don't go through the segment queue) - InputCommand::RegisterPartition { partition, partition_key_range, sender } => { - self.handle_register_partition(partition, partition_key_range, sender); + InputCommand::RegisterPartition { partition, partition_key_range, storage_reader, sender, } => { + self.handle_register_partition(partition, partition_key_range, + storage_reader, sender); }, InputCommand::Abort { partition, invocation_id } => { self.handle_abort_invocation(partition, invocation_id); @@ -448,11 +446,13 @@ where &mut self, partition: PartitionLeaderEpoch, partition_key_range: RangeInclusive, + storage_reader: SR, sender: mpsc::Sender, ) { self.invocation_state_machine_manager.register_partition( partition, partition_key_range, + storage_reader, sender, ); } @@ -484,10 +484,15 @@ where .resolve_invocation(partition, &invocation_id) .is_none()); + let storage_reader = self + .invocation_state_machine_manager + .partition_storage_reader(partition) + .expect("partition is registered"); self.quota.reserve_slot(); self.start_invocation_task( options, partition, + storage_reader.clone(), invocation_id, journal, InvocationStateMachine::create(invocation_target, options.retry_policy.clone()), @@ -706,7 +711,7 @@ where partition: PartitionLeaderEpoch, invocation_id: InvocationId, ) { - if let Some((sender, ism)) = self + if let Some((sender, _, ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -742,7 +747,7 @@ where invocation_id: InvocationId, entry_indexes: HashSet, ) { - if let Some((sender, ism)) = self + if let Some((sender, _, ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -780,7 +785,7 @@ where invocation_id: InvocationId, error: InvocationTaskError, ) { - if let Some((_, ism)) = self + if let Some((_, _, ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -805,7 +810,7 @@ where partition: PartitionLeaderEpoch, invocation_id: InvocationId, ) { - if let Some((_, mut ism)) = self + if let Some((_, _, mut ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -927,6 +932,7 @@ where &mut self, options: &InvokerOptions, partition: PartitionLeaderEpoch, + storage_reader: SR, invocation_id: InvocationId, journal: InvokeInputJournal, mut ism: InvocationStateMachine, @@ -938,6 +944,7 @@ where partition, invocation_id, ism.invocation_target.clone(), + storage_reader, self.invocation_tasks_tx.clone(), completions_rx, journal, @@ -966,7 +973,7 @@ where ) where FN: FnOnce(&mut InvocationStateMachine), { - if let Some((_, mut ism)) = self + if let Some((_, storage_reader, mut ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -975,9 +982,11 @@ where trace!( restate.invocation.target = %ism.invocation_target, "Going to retry now"); + let storage_reader = storage_reader.clone(); self.start_invocation_task( options, partition, + storage_reader, invocation_id, InvokeInputJournal::NoCachedJournal, ism, @@ -1014,6 +1023,7 @@ mod tests { use bytes::Bytes; use restate_core::TaskKind; use restate_core::TestCoreEnv; + use restate_invoker_api::mocks::EmptyStorageReader; use restate_types::arc_util::Constant; use restate_types::config::InvokerOptionsBuilder; use tempfile::tempdir; @@ -1021,7 +1031,7 @@ mod tests { use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; - use restate_invoker_api::{entry_enricher, journal_reader, state_reader, ServiceHandle}; + use restate_invoker_api::{entry_enricher, ServiceHandle}; use restate_schema_api::deployment::mocks::MockDeploymentMetadataRegistry; use restate_test_util::{check, let_assert}; use restate_types::identifiers::LeaderEpoch; @@ -1036,13 +1046,18 @@ mod tests { const MOCK_PARTITION: PartitionLeaderEpoch = (0, LeaderEpoch::INITIAL); - impl ServiceInner { + impl ServiceInner + where + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, + ::StateIter: Send, + { #[allow(clippy::type_complexity)] fn mock( invocation_task_runner: ITR, concurrency_limit: Option, ) -> ( - mpsc::UnboundedSender, + mpsc::UnboundedSender>, mpsc::UnboundedSender< restate_futures_util::command::Command< RangeInclusive, @@ -1070,26 +1085,35 @@ mod tests { (input_tx, status_tx, service_inner) } - fn register_mock_partition(&mut self) -> mpsc::Receiver + fn register_mock_partition(&mut self, storage_reader: SR) -> mpsc::Receiver where - ITR: InvocationTaskRunner, + ITR: InvocationTaskRunner, { let (partition_tx, partition_rx) = mpsc::channel(1024); - self.handle_register_partition(MOCK_PARTITION, RangeInclusive::new(0, 0), partition_tx); + self.handle_register_partition( + MOCK_PARTITION, + RangeInclusive::new(0, 0), + storage_reader, + partition_tx, + ); partition_rx } } - impl InvocationTaskRunner for F + impl InvocationTaskRunner for F where F: Fn( PartitionLeaderEpoch, InvocationId, InvocationTarget, + SR, mpsc::UnboundedSender, mpsc::UnboundedReceiver, InvokeInputJournal, ) -> Fut, + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, + ::StateIter: Send, Fut: Future + Send + 'static, { fn start_invocation_task( @@ -1098,6 +1122,7 @@ mod tests { partition: PartitionLeaderEpoch, invocation_id: InvocationId, invocation_target: InvocationTarget, + storage_reader: SR, invoker_tx: mpsc::UnboundedSender, invoker_rx: mpsc::UnboundedReceiver, input_journal: InvokeInputJournal, @@ -1107,6 +1132,7 @@ mod tests { partition, invocation_id, invocation_target, + storage_reader, invoker_tx, invoker_rx, input_journal, @@ -1137,8 +1163,6 @@ mod tests { restate_service_client::AssumeRoleCacheMode::None, ) .unwrap(), - journal_reader::mocks::EmptyJournalReader, - state_reader::mocks::EmptyStateReader, entry_enricher::mocks::MockEntryEnricher, ); @@ -1160,7 +1184,12 @@ mod tests { let (output_tx, mut output_rx) = mpsc::channel(1); handle - .register_partition(partition_leader_epoch, RangeInclusive::new(0, 0), output_tx) + .register_partition( + partition_leader_epoch, + RangeInclusive::new(0, 0), + EmptyStorageReader, + output_tx, + ) .await .unwrap(); handle @@ -1203,8 +1232,8 @@ mod tests { let invocation_id_2 = InvocationId::mock_random(); let (_invoker_tx, _status_tx, mut service_inner) = - ServiceInner::mock(|_, _, _, _, _, _| ready(()), Some(1)); - let _ = service_inner.register_mock_partition(); + ServiceInner::mock(|_, _, _, _, _, _, _| ready(()), Some(1)); + let _ = service_inner.register_mock_partition(EmptyStorageReader); // Enqueue sid_1 and sid_2 segment_queue @@ -1295,6 +1324,7 @@ mod tests { |partition, invocation_id, _service_id, + _storage_reader, invoker_tx: mpsc::UnboundedSender, _, _| { @@ -1311,7 +1341,7 @@ mod tests { }, Some(2), ); - let _ = service_inner.register_mock_partition(); + let _ = service_inner.register_mock_partition(EmptyStorageReader); // Invoke the service service_inner diff --git a/crates/invoker-impl/src/state_machine_manager.rs b/crates/invoker-impl/src/state_machine_manager.rs index 08a07bd11..8d18965e9 100644 --- a/crates/invoker-impl/src/state_machine_manager.rs +++ b/crates/invoker-impl/src/state_machine_manager.rs @@ -15,24 +15,43 @@ use restate_invoker_api::Effect; use restate_types::identifiers::PartitionKey; /// Tree of [InvocationStateMachine] held by the [Service]. -#[derive(Debug, Default)] -pub(super) struct InvocationStateMachineManager { - partitions: HashMap, +#[derive(Debug)] +pub(super) struct InvocationStateMachineManager { + partitions: HashMap>, +} + +impl Default for InvocationStateMachineManager { + fn default() -> Self { + InvocationStateMachineManager { + partitions: Default::default(), + } + } } #[derive(Debug)] -struct PartitionInvocationStateMachineCoordinator { +struct PartitionInvocationStateMachineCoordinator { output_tx: mpsc::Sender, invocation_state_machines: HashMap, partition_key_range: RangeInclusive, + storage_reader: SR, } -impl InvocationStateMachineManager { +impl InvocationStateMachineManager +where + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, + ::StateIter: Send, +{ #[inline] pub(super) fn has_partition(&self, partition: PartitionLeaderEpoch) -> bool { self.partitions.contains_key(&partition) } + #[inline] + pub(super) fn partition_storage_reader(&self, partition: PartitionLeaderEpoch) -> Option<&SR> { + self.partitions.get(&partition).map(|p| &p.storage_reader) + } + #[inline] pub(super) fn resolve_partition_sender( &self, @@ -59,11 +78,11 @@ impl InvocationStateMachineManager { &mut self, partition: PartitionLeaderEpoch, invocation_id: &InvocationId, - ) -> Option<(&mpsc::Sender, InvocationStateMachine)> { + ) -> Option<(&mpsc::Sender, &SR, InvocationStateMachine)> { self.resolve_partition(partition).and_then(|p| { p.invocation_state_machines .remove(invocation_id) - .map(|ism| (&p.output_tx, ism)) + .map(|ism| (&p.output_tx, &p.storage_reader, ism)) }) } @@ -82,6 +101,7 @@ impl InvocationStateMachineManager { &mut self, partition: PartitionLeaderEpoch, partition_key_range: RangeInclusive, + storage_reader: SR, sender: mpsc::Sender, ) { self.partitions.insert( @@ -90,6 +110,7 @@ impl InvocationStateMachineManager { output_tx: sender, invocation_state_machines: Default::default(), partition_key_range, + storage_reader, }, ); } @@ -134,7 +155,7 @@ impl InvocationStateMachineManager { fn resolve_partition( &mut self, partition: PartitionLeaderEpoch, - ) -> Option<&mut PartitionInvocationStateMachineCoordinator> { + ) -> Option<&mut PartitionInvocationStateMachineCoordinator> { self.partitions.get_mut(&partition) } } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 07009401c..88ecdb108 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -34,7 +34,7 @@ use restate_ingress_dispatcher::IngressDispatcher; use restate_ingress_http::HyperServerIngress; use restate_ingress_kafka::Service as IngressKafkaService; use restate_invoker_impl::{ - ChannelServiceHandle as InvokerChannelServiceHandle, Service as InvokerService, + InvokerHandle as InvokerChannelServiceHandle, Service as InvokerService, }; use restate_metadata_store::MetadataStoreClient; use restate_network::Networking; @@ -51,8 +51,10 @@ use crate::partition_processor_manager::{ }; use restate_types::Version; -type PartitionProcessor = - partition::PartitionProcessor; +type PartitionProcessor = partition::PartitionProcessor< + ProtobufRawEntryCodec, + InvokerChannelServiceHandle>, +>; type ExternalClientIngress = HyperServerIngress; @@ -92,7 +94,6 @@ pub struct Worker { storage_query_postgres: PostgresQueryService, #[allow(clippy::type_complexity)] invoker: InvokerService< - InvokerStorageReader, InvokerStorageReader, EntryEnricher, UpdateableSchema, @@ -163,12 +164,9 @@ impl Worker { .map_as_updateable_owned(|c| &c.worker.storage.rocksdb), ))?; - let invoker_storage_reader = InvokerStorageReader::new(rocksdb_storage.clone()); let invoker = InvokerService::from_options( &config.common.service_client, &config.worker.invoker, - invoker_storage_reader.clone(), - invoker_storage_reader, EntryEnricher::new(schema_view.clone()), schema_view.clone(), )?; diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 8c80e9426..0ca108dd0 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -39,6 +39,8 @@ use restate_types::identifiers::{InvocationId, PartitionKey}; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionLeaderEpoch}; use restate_wal_protocol::timer::TimerValue; +use super::storage::invoker::InvokerStorageReader; + type PartitionStorage = storage::PartitionStorage; type TimerService = restate_timer::TimerService; @@ -82,7 +84,7 @@ pub(crate) enum LeadershipState { impl LeadershipState where - InvokerInputSender: restate_invoker_api::ServiceHandle, + InvokerInputSender: restate_invoker_api::ServiceHandle>, { #[allow(clippy::too_many_arguments)] pub(crate) fn follower( @@ -213,8 +215,14 @@ where ) -> Result, Error> { let (invoker_tx, invoker_rx) = mpsc::channel(channel_size); + let storage = partition_storage.clone_storage(); invoker_handle - .register_partition(partition_leader_epoch, partition_key_range, invoker_tx) + .register_partition( + partition_leader_epoch, + partition_key_range, + InvokerStorageReader::new(storage), + invoker_tx, + ) .await .map_err(Error::Invoker)?; diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index e2e853a42..5deefeeb9 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -41,6 +41,8 @@ use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command, Destination, Envelope, Header}; +use self::storage::invoker::InvokerStorageReader; + #[derive(Debug)] pub(super) struct PartitionProcessor { pub partition_id: PartitionId, @@ -59,7 +61,8 @@ pub(super) struct PartitionProcessor { impl PartitionProcessor where RawEntryCodec: restate_types::journal::raw::RawEntryCodec + Default + Debug, - InvokerInputSender: restate_invoker_api::ServiceHandle + Clone, + InvokerInputSender: + restate_invoker_api::ServiceHandle> + Clone, { #[allow(clippy::too_many_arguments)] pub(super) fn new( diff --git a/crates/worker/src/partition/storage/mod.rs b/crates/worker/src/partition/storage/mod.rs index da86bb9ce..f6d49e84c 100644 --- a/crates/worker/src/partition/storage/mod.rs +++ b/crates/worker/src/partition/storage/mod.rs @@ -101,8 +101,13 @@ where + ReadOnlyVirtualObjectStatusTable + ReadOnlyJournalTable + ReadOnlyStateTable + + Clone + Send, { + pub fn clone_storage(&self) -> Storage { + self.storage.clone() + } + pub fn load_inbox_seq_number( &mut self, ) -> impl Future> + Send + '_ { diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index e902e4ead..121732c50 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -8,11 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::partition::storage::invoker::InvokerStorageReader; use crate::PartitionProcessor; use anyhow::Context; use restate_bifrost::Bifrost; use restate_core::{metadata, task_center, ShutdownError, TaskId, TaskKind}; -use restate_invoker_impl::ChannelServiceHandle; +use restate_invoker_impl::InvokerHandle; use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError}; use restate_network::Networking; use restate_storage_rocksdb::RocksDBStorage; @@ -38,7 +39,7 @@ pub struct PartitionProcessorManager { rocksdb_storage: RocksDBStorage, networking: Networking, bifrost: Bifrost, - invoker_handle: ChannelServiceHandle, + invoker_handle: InvokerHandle>, } impl PartitionProcessorManager { @@ -49,7 +50,7 @@ impl PartitionProcessorManager { rocksdb_storage: RocksDBStorage, networking: Networking, bifrost: Bifrost, - invoker_handle: ChannelServiceHandle, + invoker_handle: InvokerHandle>, ) -> Self { Self { updateable_config,