Skip to content

Commit

Permalink
Unify RpcStyle message routing
Browse files Browse the repository at this point in the history
Introduces types that makes it easier to perform Rpc-like interactions with Networking.
- `RpcMessage` trait marks messages that carry a `CorrelationId`. A sane default correlation id is provided as `RequestId`.
- `RpcRequest` trait defines request messages and their response types.
- `RpcRouter<T: RpcRequest>` enables sending rpc request and awaiting responses with auto eviction of dropped requests.
- `ResponseTracker` is a helper that manages tracking tokens for in-flight requests, this can be used in the future to replace large portions of IngressDispatcher.
- Macros to help define RPC messages to reduce code noise in node-protocol
  • Loading branch information
AhmedSoliman committed May 14, 2024
1 parent 210c0e9 commit 157fc63
Show file tree
Hide file tree
Showing 10 changed files with 623 additions and 80 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 5 additions & 12 deletions crates/ingress-dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@

use crate::error::IngressDispatchError;
use crate::{
IngressCorrelationId, IngressDispatcherRequest, IngressDispatcherRequestInner,
IngressDispatcherResponse, IngressRequestMode, IngressResponseSender,
IngressDispatcherRequest, IngressDispatcherRequestInner, IngressDispatcherResponse,
IngressRequestMode, IngressResponseSender,
};
use dashmap::DashMap;
use restate_bifrost::Bifrost;
use restate_core::metadata;
use restate_core::network::MessageHandler;
use restate_node_protocol::codec::Targeted;
use restate_node_protocol::ingress::IngressMessage;
use restate_node_protocol::ingress::{IngressCorrelationId, IngressMessage};
use restate_node_protocol::RpcMessage;
use restate_storage_api::deduplication_table::DedupInformation;
use restate_types::identifiers::{PartitionKey, WithPartitionKey};
use restate_types::message::MessageIndex;
Expand Down Expand Up @@ -132,15 +133,7 @@ impl MessageHandler for IngressDispatcher {
trace!("Processing message '{}' from '{}'", msg.kind(), peer);
match msg {
IngressMessage::InvocationResponse(invocation_response) => {
let correlation_id = invocation_response
.idempotency_id
.as_ref()
.map(|idempotency_id| {
IngressCorrelationId::IdempotencyId(idempotency_id.clone())
})
.unwrap_or_else(|| {
IngressCorrelationId::InvocationId(invocation_response.invocation_id)
});
let correlation_id = invocation_response.correlation_id();
if let Some((_, sender)) = self.state.waiting_responses.remove(&correlation_id) {
let dispatcher_response = IngressDispatcherResponse {
// TODO we need to add back the expiration time for idempotent results
Expand Down
9 changes: 1 addition & 8 deletions crates/ingress-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use bytes::Bytes;
use restate_core::metadata;
pub use restate_node_protocol::ingress::IngressCorrelationId;
use restate_schema_api::subscription::{EventReceiverServiceType, Sink, Subscription};
use restate_types::identifiers::{
partitioner, IdempotencyId, InvocationId, PartitionKey, WithPartitionKey,
Expand All @@ -31,14 +32,6 @@ pub use dispatcher::{DispatchIngressRequest, IngressDispatcher};
pub type IngressResponseSender = oneshot::Sender<IngressDispatcherResponse>;
pub type IngressResponseReceiver = oneshot::Receiver<IngressDispatcherResponse>;

// TODO we could eventually remove this type and replace it with something simpler once
// /~https://github.com/restatedev/restate/issues/1329 is in place
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum IngressCorrelationId {
InvocationId(InvocationId),
IdempotencyId(IdempotencyId),
}

#[derive(Debug)]
enum IngressDispatcherRequestInner {
Invoke(ServiceInvocation),
Expand Down
1 change: 1 addition & 0 deletions crates/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ anyhow = { workspace = true }
async-trait = { workspace = true }
bincode = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
drain = { workspace = true }
enum-map = { workspace = true }
enumset = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod error;
mod handshake;
pub(crate) mod metric_definitions;
mod networking;
pub mod rpc_router;

pub use connection::ConnectionSender;
pub use connection_manager::ConnectionManager;
Expand Down
Loading

0 comments on commit 157fc63

Please sign in to comment.