diff --git a/crates/invoker-api/src/handle.rs b/crates/invoker-api/src/handle.rs index 9b551cca3..0be8bc309 100644 --- a/crates/invoker-api/src/handle.rs +++ b/crates/invoker-api/src/handle.rs @@ -28,7 +28,7 @@ pub enum InvokeInputJournal { CachedJournal(JournalMetadata, Vec), } -pub trait ServiceHandle { +pub trait ServiceHandle { type Future: Future>; fn invoke( @@ -65,6 +65,7 @@ pub trait ServiceHandle { &mut self, partition: PartitionLeaderEpoch, partition_key_range: RangeInclusive, + storage_reader: SR, sender: mpsc::Sender, ) -> Self::Future; } diff --git a/crates/invoker-api/src/journal_reader.rs b/crates/invoker-api/src/journal_reader.rs index 478570294..a98f7ccfb 100644 --- a/crates/invoker-api/src/journal_reader.rs +++ b/crates/invoker-api/src/journal_reader.rs @@ -46,28 +46,3 @@ pub trait JournalReader { fid: &'a InvocationId, ) -> impl Future> + Send; } - -#[cfg(any(test, feature = "mocks"))] -pub mod mocks { - use super::*; - use restate_types::invocation::ServiceInvocationSpanContext; - use std::convert::Infallible; - - #[derive(Debug, Clone)] - pub struct EmptyJournalReader; - - impl JournalReader for EmptyJournalReader { - type JournalStream = futures::stream::Empty; - type Error = Infallible; - - async fn read_journal<'a>( - &'a mut self, - _sid: &'a InvocationId, - ) -> Result<(JournalMetadata, Self::JournalStream), Self::Error> { - Ok(( - JournalMetadata::new(0, ServiceInvocationSpanContext::empty(), None), - futures::stream::empty(), - )) - } - } -} diff --git a/crates/invoker-api/src/lib.rs b/crates/invoker-api/src/lib.rs index 23c61cfbb..00aecdf47 100644 --- a/crates/invoker-api/src/lib.rs +++ b/crates/invoker-api/src/lib.rs @@ -21,3 +21,44 @@ pub use handle::*; pub use journal_reader::{JournalMetadata, JournalReader}; pub use state_reader::{EagerState, StateReader}; pub use status_handle::{InvocationErrorReport, InvocationStatusReport, StatusHandle}; + +#[cfg(any(test, feature = "mocks"))] +pub mod mocks { + use super::*; + use bytes::Bytes; + use restate_types::identifiers::{InvocationId, ServiceId}; + use restate_types::invocation::ServiceInvocationSpanContext; + use restate_types::journal::raw::PlainRawEntry; + use std::convert::Infallible; + use std::iter::empty; + + #[derive(Debug, Clone)] + pub struct EmptyStorageReader; + + impl JournalReader for EmptyStorageReader { + type JournalStream = futures::stream::Empty; + type Error = Infallible; + + async fn read_journal<'a>( + &'a mut self, + _sid: &'a InvocationId, + ) -> Result<(JournalMetadata, Self::JournalStream), Self::Error> { + Ok(( + JournalMetadata::new(0, ServiceInvocationSpanContext::empty(), None), + futures::stream::empty(), + )) + } + } + + impl StateReader for EmptyStorageReader { + type StateIter = std::iter::Empty<(Bytes, Bytes)>; + type Error = Infallible; + + async fn read_state<'a>( + &'a mut self, + _service_id: &'a ServiceId, + ) -> Result, Self::Error> { + Ok(EagerState::new_complete(empty())) + } + } +} diff --git a/crates/invoker-api/src/state_reader.rs b/crates/invoker-api/src/state_reader.rs index 2341a8600..e3a57affe 100644 --- a/crates/invoker-api/src/state_reader.rs +++ b/crates/invoker-api/src/state_reader.rs @@ -75,27 +75,3 @@ pub trait StateReader { service_id: &'a ServiceId, ) -> impl Future, Self::Error>> + Send; } - -#[cfg(any(test, feature = "mocks"))] -pub mod mocks { - use crate::{EagerState, StateReader}; - use bytes::Bytes; - use restate_types::identifiers::ServiceId; - use std::convert::Infallible; - use std::iter::empty; - - #[derive(Debug, Clone)] - pub struct EmptyStateReader; - - impl StateReader for EmptyStateReader { - type StateIter = std::iter::Empty<(Bytes, Bytes)>; - type Error = Infallible; - - async fn read_state<'a>( - &'a mut self, - _service_id: &'a ServiceId, - ) -> Result, Self::Error> { - Ok(EagerState::new_complete(empty())) - } - } -} diff --git a/crates/invoker-impl/src/input_command.rs b/crates/invoker-impl/src/input_command.rs index 7758be183..fd840e68c 100644 --- a/crates/invoker-impl/src/input_command.rs +++ b/crates/invoker-impl/src/input_command.rs @@ -30,7 +30,7 @@ pub(crate) struct InvokeCommand { } #[derive(Debug)] -pub(crate) enum InputCommand { +pub(crate) enum InputCommand { Invoke(InvokeCommand), Completion { partition: PartitionLeaderEpoch, @@ -58,6 +58,7 @@ pub(crate) enum InputCommand { RegisterPartition { partition: PartitionLeaderEpoch, partition_key_range: RangeInclusive, + storage_reader: SR, sender: mpsc::Sender, }, } @@ -65,13 +66,12 @@ pub(crate) enum InputCommand { // -- Handles implementations. This is just glue code between the Input and the interfaces #[derive(Debug, Clone)] -pub struct ChannelServiceHandle { - pub(super) input: mpsc::UnboundedSender, +pub struct InvokerHandle { + pub(super) input: mpsc::UnboundedSender>, } -impl ServiceHandle for ChannelServiceHandle { +impl ServiceHandle for InvokerHandle { type Future = futures::future::Ready>; - fn invoke( &mut self, partition: PartitionLeaderEpoch, @@ -152,6 +152,7 @@ impl ServiceHandle for ChannelServiceHandle { &mut self, partition: PartitionLeaderEpoch, partition_key_range: RangeInclusive, + storage_reader: SR, sender: mpsc::Sender, ) -> Self::Future { futures::future::ready( @@ -160,6 +161,7 @@ impl ServiceHandle for ChannelServiceHandle { partition, partition_key_range, sender, + storage_reader, }) .map_err(|_| NotRunningError), ) diff --git a/crates/invoker-impl/src/invocation_task.rs b/crates/invoker-impl/src/invocation_task.rs index 30d4a55d6..db727deae 100644 --- a/crates/invoker-impl/src/invocation_task.rs +++ b/crates/invoker-impl/src/invocation_task.rs @@ -240,7 +240,7 @@ impl From for InvocationTaskOutputInner { } /// Represents an open invocation stream -pub(super) struct InvocationTask { +pub(super) struct InvocationTask { // Shared client client: ServiceClient, @@ -253,8 +253,8 @@ pub(super) struct InvocationTask { disable_eager_state: bool, // Invoker tx/rx - journal_reader: JR, state_reader: SR, + journal_reader: JR, entry_enricher: EE, deployment_metadata_resolver: DMR, invoker_tx: mpsc::UnboundedSender, @@ -297,11 +297,11 @@ macro_rules! shortcircuit { }; } -impl InvocationTask +impl InvocationTask where + SR: StateReader + StateReader + Clone + Send + Sync + 'static, JR: JournalReader + Clone + Send + Sync + 'static, ::JournalStream: Unpin + Send + 'static, - SR: StateReader + Clone + Send + Sync + 'static, ::StateIter: Send, EE: EntryEnricher, DMR: DeploymentResolver, @@ -318,8 +318,8 @@ where disable_eager_state: bool, message_size_warning: usize, message_size_limit: Option, - journal_reader: JR, state_reader: SR, + journal_reader: JR, entry_enricher: EE, deployment_metadata_resolver: DMR, invoker_tx: mpsc::UnboundedSender, @@ -334,8 +334,8 @@ where abort_timeout, disable_eager_state, next_journal_index: 0, - journal_reader, state_reader, + journal_reader, entry_enricher, deployment_metadata_resolver, invoker_tx, diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index ba853ec36..5f890070f 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -53,8 +53,8 @@ use tracing::instrument; use tracing::{debug, trace}; use crate::invocation_task::InvocationTaskError; -pub use input_command::ChannelServiceHandle; pub use input_command::ChannelStatusReader; +pub use input_command::InvokerHandle; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_service_protocol::RESTATE_SERVICE_PROTOCOL_VERSION; use restate_types::invocation::InvocationTarget; @@ -72,7 +72,7 @@ pub(crate) enum Notification { // -- InvocationTask factory: we use this to mock the state machine in tests -trait InvocationTaskRunner { +trait InvocationTaskRunner { #[allow(clippy::too_many_arguments)] fn start_invocation_task( &self, @@ -80,6 +80,7 @@ trait InvocationTaskRunner { partition: PartitionLeaderEpoch, invocation_id: InvocationId, invocation_target: InvocationTarget, + storage_reader: SR, invoker_tx: mpsc::UnboundedSender, invoker_rx: mpsc::UnboundedReceiver, input_journal: InvokeInputJournal, @@ -88,19 +89,16 @@ trait InvocationTaskRunner { } #[derive(Debug)] -struct DefaultInvocationTaskRunner { +struct DefaultInvocationTaskRunner { client: ServiceClient, - journal_reader: JR, - state_reader: SR, entry_enricher: EE, deployment_metadata_resolver: DMR, } -impl InvocationTaskRunner for DefaultInvocationTaskRunner +impl InvocationTaskRunner for DefaultInvocationTaskRunner where - JR: JournalReader + Clone + Send + Sync + 'static, - ::JournalStream: Unpin + Send + 'static, - SR: StateReader + Clone + Send + Sync + 'static, + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, ::StateIter: Send, EE: EntryEnricher + Clone + Send + 'static, DMR: DeploymentResolver + Clone + Send + 'static, @@ -111,6 +109,7 @@ where partition: PartitionLeaderEpoch, invocation_id: InvocationId, invocation_target: InvocationTarget, + storage_reader: SR, invoker_tx: mpsc::UnboundedSender, invoker_rx: mpsc::UnboundedReceiver, input_journal: InvokeInputJournal, @@ -128,8 +127,8 @@ where opts.disable_eager_state, opts.message_size_warning.get(), opts.message_size_limit(), - self.journal_reader.clone(), - self.state_reader.clone(), + storage_reader.clone(), + storage_reader, self.entry_enricher.clone(), self.deployment_metadata_resolver.clone(), invoker_tx, @@ -143,9 +142,9 @@ where // -- Service implementation #[derive(Debug)] -pub struct Service { +pub struct Service { // Used for constructing the invoker sender and status reader - input_tx: mpsc::UnboundedSender, + input_tx: mpsc::UnboundedSender>, status_tx: mpsc::UnboundedSender< restate_futures_util::command::Command< RangeInclusive, @@ -156,21 +155,23 @@ pub struct Service, - >, + inner: ServiceInner, SR>, } -impl Service { +impl Service { #[allow(clippy::too_many_arguments)] - pub(crate) fn new( + pub(crate) fn new( options: &InvokerOptions, deployment_metadata_resolver: DMR, client: ServiceClient, - journal_reader: JR, - state_reader: SR, entry_enricher: EE, - ) -> Service { + ) -> Service + where + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + JS: Stream + Unpin + Send + 'static, + EE: EntryEnricher, + DMR: DeploymentResolver, + { let (input_tx, input_rx) = mpsc::unbounded_channel(); let (status_tx, status_rx) = mpsc::unbounded_channel(); let (invocation_tasks_tx, invocation_tasks_rx) = mpsc::unbounded_channel(); @@ -186,8 +187,6 @@ impl Service { invocation_tasks_rx, invocation_task_runner: DefaultInvocationTaskRunner { client, - journal_reader, - state_reader, entry_enricher, deployment_metadata_resolver, }, @@ -203,13 +202,11 @@ impl Service { pub fn from_options( service_client_options: &ServiceClientOptions, invoker_options: &InvokerOptions, - journal_reader: JR, - state_reader: SR, entry_enricher: EE, deployment_registry: DMR, - ) -> Result, BuildError> + ) -> Result, BuildError> where - JR: JournalReader + Clone + Send + Sync + 'static, + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, JS: Stream + Unpin + Send + 'static, EE: EntryEnricher, DMR: DeploymentResolver, @@ -222,8 +219,6 @@ impl Service { invoker_options, deployment_registry, client, - journal_reader, - state_reader, entry_enricher, )) } @@ -235,17 +230,16 @@ pub enum BuildError { ServiceClient(#[from] restate_service_client::BuildError), } -impl Service +impl Service where - JR: JournalReader + Clone + Send + Sync + 'static, - ::JournalStream: Unpin + Send + 'static, - SR: StateReader + Clone + Send + Sync + 'static, + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, ::StateIter: Send, EE: EntryEnricher + Clone + Send + 'static, EMR: DeploymentResolver + Clone + Send + 'static, { - pub fn handle(&self) -> ChannelServiceHandle { - ChannelServiceHandle { + pub fn handle(&self) -> InvokerHandle { + InvokerHandle { input: self.input_tx.clone(), } } @@ -289,8 +283,8 @@ where } #[derive(Debug)] -struct ServiceInner { - input_rx: mpsc::UnboundedReceiver, +struct ServiceInner { + input_rx: mpsc::UnboundedReceiver>, status_rx: mpsc::UnboundedReceiver< restate_futures_util::command::Command< RangeInclusive, @@ -310,12 +304,15 @@ struct ServiceInner { retry_timers: TimerQueue<(PartitionLeaderEpoch, InvocationId)>, quota: quota::InvokerConcurrencyQuota, status_store: InvocationStatusStore, - invocation_state_machine_manager: state_machine_manager::InvocationStateMachineManager, + invocation_state_machine_manager: state_machine_manager::InvocationStateMachineManager, } -impl ServiceInner +impl ServiceInner where - ITR: InvocationTaskRunner, + ITR: InvocationTaskRunner, + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, + ::StateIter: Send, { // Returns true if we should execute another step, false if we should stop executing steps async fn step( @@ -347,8 +344,9 @@ where segmented_input_queue.enqueue(invoke_command).await; }, // --- Other commands (they don't go through the segment queue) - InputCommand::RegisterPartition { partition, partition_key_range, sender } => { - self.handle_register_partition(partition, partition_key_range, sender); + InputCommand::RegisterPartition { partition, partition_key_range, storage_reader, sender, } => { + self.handle_register_partition(partition, partition_key_range, + storage_reader, sender); }, InputCommand::Abort { partition, invocation_id } => { self.handle_abort_invocation(partition, invocation_id); @@ -448,11 +446,13 @@ where &mut self, partition: PartitionLeaderEpoch, partition_key_range: RangeInclusive, + storage_reader: SR, sender: mpsc::Sender, ) { self.invocation_state_machine_manager.register_partition( partition, partition_key_range, + storage_reader, sender, ); } @@ -484,10 +484,15 @@ where .resolve_invocation(partition, &invocation_id) .is_none()); + let storage_reader = self + .invocation_state_machine_manager + .partition_storage_reader(partition) + .expect("partition is registered"); self.quota.reserve_slot(); self.start_invocation_task( options, partition, + storage_reader.clone(), invocation_id, journal, InvocationStateMachine::create(invocation_target, options.retry_policy.clone()), @@ -706,7 +711,7 @@ where partition: PartitionLeaderEpoch, invocation_id: InvocationId, ) { - if let Some((sender, ism)) = self + if let Some((sender, _, ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -742,7 +747,7 @@ where invocation_id: InvocationId, entry_indexes: HashSet, ) { - if let Some((sender, ism)) = self + if let Some((sender, _, ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -780,7 +785,7 @@ where invocation_id: InvocationId, error: InvocationTaskError, ) { - if let Some((_, ism)) = self + if let Some((_, _, ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -805,7 +810,7 @@ where partition: PartitionLeaderEpoch, invocation_id: InvocationId, ) { - if let Some((_, mut ism)) = self + if let Some((_, _, mut ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -927,6 +932,7 @@ where &mut self, options: &InvokerOptions, partition: PartitionLeaderEpoch, + storage_reader: SR, invocation_id: InvocationId, journal: InvokeInputJournal, mut ism: InvocationStateMachine, @@ -938,6 +944,7 @@ where partition, invocation_id, ism.invocation_target.clone(), + storage_reader, self.invocation_tasks_tx.clone(), completions_rx, journal, @@ -966,7 +973,7 @@ where ) where FN: FnOnce(&mut InvocationStateMachine), { - if let Some((_, mut ism)) = self + if let Some((_, storage_reader, mut ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -975,9 +982,11 @@ where trace!( restate.invocation.target = %ism.invocation_target, "Going to retry now"); + let storage_reader = storage_reader.clone(); self.start_invocation_task( options, partition, + storage_reader, invocation_id, InvokeInputJournal::NoCachedJournal, ism, @@ -1014,6 +1023,7 @@ mod tests { use bytes::Bytes; use restate_core::TaskKind; use restate_core::TestCoreEnv; + use restate_invoker_api::mocks::EmptyStorageReader; use restate_types::arc_util::Constant; use restate_types::config::InvokerOptionsBuilder; use tempfile::tempdir; @@ -1021,7 +1031,7 @@ mod tests { use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; - use restate_invoker_api::{entry_enricher, journal_reader, state_reader, ServiceHandle}; + use restate_invoker_api::{entry_enricher, ServiceHandle}; use restate_schema_api::deployment::mocks::MockDeploymentMetadataRegistry; use restate_test_util::{check, let_assert}; use restate_types::identifiers::LeaderEpoch; @@ -1036,13 +1046,18 @@ mod tests { const MOCK_PARTITION: PartitionLeaderEpoch = (0, LeaderEpoch::INITIAL); - impl ServiceInner { + impl ServiceInner + where + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, + ::StateIter: Send, + { #[allow(clippy::type_complexity)] fn mock( invocation_task_runner: ITR, concurrency_limit: Option, ) -> ( - mpsc::UnboundedSender, + mpsc::UnboundedSender>, mpsc::UnboundedSender< restate_futures_util::command::Command< RangeInclusive, @@ -1070,26 +1085,35 @@ mod tests { (input_tx, status_tx, service_inner) } - fn register_mock_partition(&mut self) -> mpsc::Receiver + fn register_mock_partition(&mut self, storage_reader: SR) -> mpsc::Receiver where - ITR: InvocationTaskRunner, + ITR: InvocationTaskRunner, { let (partition_tx, partition_rx) = mpsc::channel(1024); - self.handle_register_partition(MOCK_PARTITION, RangeInclusive::new(0, 0), partition_tx); + self.handle_register_partition( + MOCK_PARTITION, + RangeInclusive::new(0, 0), + storage_reader, + partition_tx, + ); partition_rx } } - impl InvocationTaskRunner for F + impl InvocationTaskRunner for F where F: Fn( PartitionLeaderEpoch, InvocationId, InvocationTarget, + SR, mpsc::UnboundedSender, mpsc::UnboundedReceiver, InvokeInputJournal, ) -> Fut, + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, + ::StateIter: Send, Fut: Future + Send + 'static, { fn start_invocation_task( @@ -1098,6 +1122,7 @@ mod tests { partition: PartitionLeaderEpoch, invocation_id: InvocationId, invocation_target: InvocationTarget, + storage_reader: SR, invoker_tx: mpsc::UnboundedSender, invoker_rx: mpsc::UnboundedReceiver, input_journal: InvokeInputJournal, @@ -1107,6 +1132,7 @@ mod tests { partition, invocation_id, invocation_target, + storage_reader, invoker_tx, invoker_rx, input_journal, @@ -1137,8 +1163,6 @@ mod tests { restate_service_client::AssumeRoleCacheMode::None, ) .unwrap(), - journal_reader::mocks::EmptyJournalReader, - state_reader::mocks::EmptyStateReader, entry_enricher::mocks::MockEntryEnricher, ); @@ -1160,7 +1184,12 @@ mod tests { let (output_tx, mut output_rx) = mpsc::channel(1); handle - .register_partition(partition_leader_epoch, RangeInclusive::new(0, 0), output_tx) + .register_partition( + partition_leader_epoch, + RangeInclusive::new(0, 0), + EmptyStorageReader, + output_tx, + ) .await .unwrap(); handle @@ -1203,8 +1232,8 @@ mod tests { let invocation_id_2 = InvocationId::mock_random(); let (_invoker_tx, _status_tx, mut service_inner) = - ServiceInner::mock(|_, _, _, _, _, _| ready(()), Some(1)); - let _ = service_inner.register_mock_partition(); + ServiceInner::mock(|_, _, _, _, _, _, _| ready(()), Some(1)); + let _ = service_inner.register_mock_partition(EmptyStorageReader); // Enqueue sid_1 and sid_2 segment_queue @@ -1295,6 +1324,7 @@ mod tests { |partition, invocation_id, _service_id, + _storage_reader, invoker_tx: mpsc::UnboundedSender, _, _| { @@ -1311,7 +1341,7 @@ mod tests { }, Some(2), ); - let _ = service_inner.register_mock_partition(); + let _ = service_inner.register_mock_partition(EmptyStorageReader); // Invoke the service service_inner diff --git a/crates/invoker-impl/src/state_machine_manager.rs b/crates/invoker-impl/src/state_machine_manager.rs index 08a07bd11..8d18965e9 100644 --- a/crates/invoker-impl/src/state_machine_manager.rs +++ b/crates/invoker-impl/src/state_machine_manager.rs @@ -15,24 +15,43 @@ use restate_invoker_api::Effect; use restate_types::identifiers::PartitionKey; /// Tree of [InvocationStateMachine] held by the [Service]. -#[derive(Debug, Default)] -pub(super) struct InvocationStateMachineManager { - partitions: HashMap, +#[derive(Debug)] +pub(super) struct InvocationStateMachineManager { + partitions: HashMap>, +} + +impl Default for InvocationStateMachineManager { + fn default() -> Self { + InvocationStateMachineManager { + partitions: Default::default(), + } + } } #[derive(Debug)] -struct PartitionInvocationStateMachineCoordinator { +struct PartitionInvocationStateMachineCoordinator { output_tx: mpsc::Sender, invocation_state_machines: HashMap, partition_key_range: RangeInclusive, + storage_reader: SR, } -impl InvocationStateMachineManager { +impl InvocationStateMachineManager +where + SR: JournalReader + StateReader + Clone + Send + Sync + 'static, + ::JournalStream: Unpin + Send + 'static, + ::StateIter: Send, +{ #[inline] pub(super) fn has_partition(&self, partition: PartitionLeaderEpoch) -> bool { self.partitions.contains_key(&partition) } + #[inline] + pub(super) fn partition_storage_reader(&self, partition: PartitionLeaderEpoch) -> Option<&SR> { + self.partitions.get(&partition).map(|p| &p.storage_reader) + } + #[inline] pub(super) fn resolve_partition_sender( &self, @@ -59,11 +78,11 @@ impl InvocationStateMachineManager { &mut self, partition: PartitionLeaderEpoch, invocation_id: &InvocationId, - ) -> Option<(&mpsc::Sender, InvocationStateMachine)> { + ) -> Option<(&mpsc::Sender, &SR, InvocationStateMachine)> { self.resolve_partition(partition).and_then(|p| { p.invocation_state_machines .remove(invocation_id) - .map(|ism| (&p.output_tx, ism)) + .map(|ism| (&p.output_tx, &p.storage_reader, ism)) }) } @@ -82,6 +101,7 @@ impl InvocationStateMachineManager { &mut self, partition: PartitionLeaderEpoch, partition_key_range: RangeInclusive, + storage_reader: SR, sender: mpsc::Sender, ) { self.partitions.insert( @@ -90,6 +110,7 @@ impl InvocationStateMachineManager { output_tx: sender, invocation_state_machines: Default::default(), partition_key_range, + storage_reader, }, ); } @@ -134,7 +155,7 @@ impl InvocationStateMachineManager { fn resolve_partition( &mut self, partition: PartitionLeaderEpoch, - ) -> Option<&mut PartitionInvocationStateMachineCoordinator> { + ) -> Option<&mut PartitionInvocationStateMachineCoordinator> { self.partitions.get_mut(&partition) } } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 07009401c..88ecdb108 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -34,7 +34,7 @@ use restate_ingress_dispatcher::IngressDispatcher; use restate_ingress_http::HyperServerIngress; use restate_ingress_kafka::Service as IngressKafkaService; use restate_invoker_impl::{ - ChannelServiceHandle as InvokerChannelServiceHandle, Service as InvokerService, + InvokerHandle as InvokerChannelServiceHandle, Service as InvokerService, }; use restate_metadata_store::MetadataStoreClient; use restate_network::Networking; @@ -51,8 +51,10 @@ use crate::partition_processor_manager::{ }; use restate_types::Version; -type PartitionProcessor = - partition::PartitionProcessor; +type PartitionProcessor = partition::PartitionProcessor< + ProtobufRawEntryCodec, + InvokerChannelServiceHandle>, +>; type ExternalClientIngress = HyperServerIngress; @@ -92,7 +94,6 @@ pub struct Worker { storage_query_postgres: PostgresQueryService, #[allow(clippy::type_complexity)] invoker: InvokerService< - InvokerStorageReader, InvokerStorageReader, EntryEnricher, UpdateableSchema, @@ -163,12 +164,9 @@ impl Worker { .map_as_updateable_owned(|c| &c.worker.storage.rocksdb), ))?; - let invoker_storage_reader = InvokerStorageReader::new(rocksdb_storage.clone()); let invoker = InvokerService::from_options( &config.common.service_client, &config.worker.invoker, - invoker_storage_reader.clone(), - invoker_storage_reader, EntryEnricher::new(schema_view.clone()), schema_view.clone(), )?; diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 8c80e9426..0ca108dd0 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -39,6 +39,8 @@ use restate_types::identifiers::{InvocationId, PartitionKey}; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionLeaderEpoch}; use restate_wal_protocol::timer::TimerValue; +use super::storage::invoker::InvokerStorageReader; + type PartitionStorage = storage::PartitionStorage; type TimerService = restate_timer::TimerService; @@ -82,7 +84,7 @@ pub(crate) enum LeadershipState { impl LeadershipState where - InvokerInputSender: restate_invoker_api::ServiceHandle, + InvokerInputSender: restate_invoker_api::ServiceHandle>, { #[allow(clippy::too_many_arguments)] pub(crate) fn follower( @@ -213,8 +215,14 @@ where ) -> Result, Error> { let (invoker_tx, invoker_rx) = mpsc::channel(channel_size); + let storage = partition_storage.clone_storage(); invoker_handle - .register_partition(partition_leader_epoch, partition_key_range, invoker_tx) + .register_partition( + partition_leader_epoch, + partition_key_range, + InvokerStorageReader::new(storage), + invoker_tx, + ) .await .map_err(Error::Invoker)?; diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index e2e853a42..5deefeeb9 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -41,6 +41,8 @@ use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command, Destination, Envelope, Header}; +use self::storage::invoker::InvokerStorageReader; + #[derive(Debug)] pub(super) struct PartitionProcessor { pub partition_id: PartitionId, @@ -59,7 +61,8 @@ pub(super) struct PartitionProcessor { impl PartitionProcessor where RawEntryCodec: restate_types::journal::raw::RawEntryCodec + Default + Debug, - InvokerInputSender: restate_invoker_api::ServiceHandle + Clone, + InvokerInputSender: + restate_invoker_api::ServiceHandle> + Clone, { #[allow(clippy::too_many_arguments)] pub(super) fn new( diff --git a/crates/worker/src/partition/storage/mod.rs b/crates/worker/src/partition/storage/mod.rs index da86bb9ce..f6d49e84c 100644 --- a/crates/worker/src/partition/storage/mod.rs +++ b/crates/worker/src/partition/storage/mod.rs @@ -101,8 +101,13 @@ where + ReadOnlyVirtualObjectStatusTable + ReadOnlyJournalTable + ReadOnlyStateTable + + Clone + Send, { + pub fn clone_storage(&self) -> Storage { + self.storage.clone() + } + pub fn load_inbox_seq_number( &mut self, ) -> impl Future> + Send + '_ { diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index e902e4ead..121732c50 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -8,11 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::partition::storage::invoker::InvokerStorageReader; use crate::PartitionProcessor; use anyhow::Context; use restate_bifrost::Bifrost; use restate_core::{metadata, task_center, ShutdownError, TaskId, TaskKind}; -use restate_invoker_impl::ChannelServiceHandle; +use restate_invoker_impl::InvokerHandle; use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError}; use restate_network::Networking; use restate_storage_rocksdb::RocksDBStorage; @@ -38,7 +39,7 @@ pub struct PartitionProcessorManager { rocksdb_storage: RocksDBStorage, networking: Networking, bifrost: Bifrost, - invoker_handle: ChannelServiceHandle, + invoker_handle: InvokerHandle>, } impl PartitionProcessorManager { @@ -49,7 +50,7 @@ impl PartitionProcessorManager { rocksdb_storage: RocksDBStorage, networking: Networking, bifrost: Bifrost, - invoker_handle: ChannelServiceHandle, + invoker_handle: InvokerHandle>, ) -> Self { Self { updateable_config,