Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TryServerStreaming interaction pattern #71

Merged
merged 4 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 8 additions & 270 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@
//! The main entry point is [RpcClient].
use crate::{
map::{ChainedMapper, MapService, Mapper},
message::{BidiStreamingMsg, ClientStreamingMsg, RpcMsg, ServerStreamingMsg},
transport::ConnectionErrors,
Service, ServiceConnection,
};
use futures::{future::BoxFuture, FutureExt, Sink, SinkExt, Stream, StreamExt, TryFutureExt};
use futures::{Sink, SinkExt, Stream};
use pin_project::pin_project;
use std::{
error,
fmt::{self, Debug},
fmt::Debug,
marker::PhantomData,
pin::Pin,
result,
sync::Arc,
task::{Context, Poll},
};
Expand All @@ -28,8 +24,8 @@ pub type BoxStreamSync<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>
/// for the client DSL. `S` is the service type, `C` is the substream source.
#[derive(Debug)]
pub struct RpcClient<S, C, SInner = S> {
source: C,
map: Arc<dyn MapService<S, SInner>>,
pub(crate) source: C,
pub(crate) map: Arc<dyn MapService<S, SInner>>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Frando one idea of this crate was that you should be able to define new patterns outside the crate. For that this would have to be somehow exposed, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYM by "define new patterns outside the crate"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We got 4 built in interaction patterns: rpc, server streaming, client streaming and bidi streaming. But there can be many more. The interaction patterns are not closely coupled to the server or client transport impls. So you should be able to do all of this outside the main crate. I just added these 4 so it is "batteries included" in a way.

}

impl<S, C: Clone, SInner> Clone for RpcClient<S, C, SInner> {
Expand All @@ -46,9 +42,9 @@ impl<S, C: Clone, SInner> Clone for RpcClient<S, C, SInner> {
#[pin_project]
#[derive(Debug)]
pub struct UpdateSink<S, C, T, SInner = S>(
#[pin] C::SendSink,
PhantomData<T>,
Arc<dyn MapService<S, SInner>>,
#[pin] pub C::SendSink,
pub PhantomData<T>,
pub Arc<dyn MapService<S, SInner>>,
)
where
S: Service,
Expand Down Expand Up @@ -132,137 +128,6 @@ where
map: Arc::new(map),
}
}

/// RPC call to the server, single request, single response
pub async fn rpc<M>(&self, msg: M) -> result::Result<M::Response, RpcClientError<C>>
where
M: RpcMsg<SInner>,
{
let msg = self.map.req_into_outer(msg.into());
let (mut send, mut recv) = self.source.open_bi().await.map_err(RpcClientError::Open)?;
send.send(msg).await.map_err(RpcClientError::<C>::Send)?;
let res = recv
.next()
.await
.ok_or(RpcClientError::<C>::EarlyClose)?
.map_err(RpcClientError::<C>::RecvError)?;
// keep send alive until we have the answer
drop(send);
let res = self
.map
.res_try_into_inner(res)
.map_err(|_| RpcClientError::DowncastError)?;
M::Response::try_from(res).map_err(|_| RpcClientError::DowncastError)
}

/// Bidi call to the server, request opens a stream, response is a stream
pub async fn server_streaming<M>(
&self,
msg: M,
) -> result::Result<
BoxStreamSync<'static, result::Result<M::Response, StreamingResponseItemError<C>>>,
StreamingResponseError<C>,
>
where
M: ServerStreamingMsg<SInner>,
{
let msg = self.map.req_into_outer(msg.into());
let (mut send, recv) = self
.source
.open_bi()
.await
.map_err(StreamingResponseError::Open)?;
send.send(msg)
.map_err(StreamingResponseError::<C>::Send)
.await?;
let map = Arc::clone(&self.map);
let recv = recv.map(move |x| match x {
Ok(x) => {
let x = map
.res_try_into_inner(x)
.map_err(|_| StreamingResponseItemError::DowncastError)?;
M::Response::try_from(x).map_err(|_| StreamingResponseItemError::DowncastError)
}
Err(e) => Err(StreamingResponseItemError::RecvError(e)),
});
// keep send alive so the request on the server side does not get cancelled
let recv = Box::pin(DeferDrop(recv, send));
Ok(recv)
}

/// Call to the server that allows the client to stream, single response
pub async fn client_streaming<M>(
&self,
msg: M,
) -> result::Result<
(
UpdateSink<S, C, M::Update, SInner>,
BoxFuture<'static, result::Result<M::Response, ClientStreamingItemError<C>>>,
),
ClientStreamingError<C>,
>
where
M: ClientStreamingMsg<SInner>,
{
let msg = self.map.req_into_outer(msg.into());
let (mut send, mut recv) = self
.source
.open_bi()
.await
.map_err(ClientStreamingError::Open)?;
send.send(msg).map_err(ClientStreamingError::Send).await?;
let send = UpdateSink::<S, C, M::Update, SInner>(send, PhantomData, Arc::clone(&self.map));
let map = Arc::clone(&self.map);
let recv = async move {
let item = recv
.next()
.await
.ok_or(ClientStreamingItemError::EarlyClose)?;

match item {
Ok(x) => {
let x = map
.res_try_into_inner(x)
.map_err(|_| ClientStreamingItemError::DowncastError)?;
M::Response::try_from(x).map_err(|_| ClientStreamingItemError::DowncastError)
}
Err(e) => Err(ClientStreamingItemError::RecvError(e)),
}
}
.boxed();
Ok((send, recv))
}

/// Bidi call to the server, request opens a stream, response is a stream
pub async fn bidi<M>(
&self,
msg: M,
) -> result::Result<
(
UpdateSink<S, C, M::Update, SInner>,
BoxStreamSync<'static, result::Result<M::Response, BidiItemError<C>>>,
),
BidiError<C>,
>
where
M: BidiStreamingMsg<SInner>,
{
let msg = self.map.req_into_outer(msg.into());
let (mut send, recv) = self.source.open_bi().await.map_err(BidiError::Open)?;
send.send(msg).await.map_err(BidiError::<C>::Send)?;
let send = UpdateSink(send, PhantomData, Arc::clone(&self.map));
let map = Arc::clone(&self.map);
let recv = Box::pin(recv.map(move |x| match x {
Ok(x) => {
let x = map
.res_try_into_inner(x)
.map_err(|_| BidiItemError::DowncastError)?;
M::Response::try_from(x).map_err(|_| BidiItemError::DowncastError)
}
Err(e) => Err(BidiItemError::RecvError(e)),
}));
Ok((send, recv))
}
}

impl<S, C, SInner> AsRef<C> for RpcClient<S, C, SInner>
Expand All @@ -276,136 +141,9 @@ where
}
}

/// Client error. All client DSL methods return a `Result` with this error type.
#[derive(Debug)]
pub enum RpcClientError<C: ConnectionErrors> {
/// Unable to open a substream at all
Open(C::OpenError),
/// Unable to send the request to the server
Send(C::SendError),
/// Server closed the stream before sending a response
EarlyClose,
/// Unable to receive the response from the server
RecvError(C::RecvError),
/// Unexpected response from the server
DowncastError,
}

impl<C: ConnectionErrors> fmt::Display for RpcClientError<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}

impl<C: ConnectionErrors> error::Error for RpcClientError<C> {}

/// Server error when accepting a bidi request
#[derive(Debug)]
pub enum BidiError<C: ConnectionErrors> {
/// Unable to open a substream at all
Open(C::OpenError),
/// Unable to send the request to the server
Send(C::SendError),
}

impl<C: ConnectionErrors> fmt::Display for BidiError<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}

impl<C: ConnectionErrors> error::Error for BidiError<C> {}

/// Server error when receiving an item for a bidi request
#[derive(Debug)]
pub enum BidiItemError<C: ConnectionErrors> {
/// Unable to receive the response from the server
RecvError(C::RecvError),
/// Unexpected response from the server
DowncastError,
}

impl<C: ConnectionErrors> fmt::Display for BidiItemError<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}

impl<C: ConnectionErrors> error::Error for BidiItemError<C> {}

/// Server error when accepting a client streaming request
#[derive(Debug)]
pub enum ClientStreamingError<C: ConnectionErrors> {
/// Unable to open a substream at all
Open(C::OpenError),
/// Unable to send the request to the server
Send(C::SendError),
}

impl<C: ConnectionErrors> fmt::Display for ClientStreamingError<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}

impl<C: ConnectionErrors> error::Error for ClientStreamingError<C> {}

/// Server error when receiving an item for a client streaming request
#[derive(Debug)]
pub enum ClientStreamingItemError<C: ConnectionErrors> {
/// Connection was closed before receiving the first message
EarlyClose,
/// Unable to receive the response from the server
RecvError(C::RecvError),
/// Unexpected response from the server
DowncastError,
}

impl<C: ConnectionErrors> fmt::Display for ClientStreamingItemError<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}

impl<C: ConnectionErrors> error::Error for ClientStreamingItemError<C> {}

/// Server error when accepting a server streaming request
#[derive(Debug)]
pub enum StreamingResponseError<C: ConnectionErrors> {
/// Unable to open a substream at all
Open(C::OpenError),
/// Unable to send the request to the server
Send(C::SendError),
}

impl<S: ConnectionErrors> fmt::Display for StreamingResponseError<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}

impl<S: ConnectionErrors> error::Error for StreamingResponseError<S> {}

/// Client error when handling responses from a server streaming request
#[derive(Debug)]
pub enum StreamingResponseItemError<S: ConnectionErrors> {
/// Unable to receive the response from the server
RecvError(S::RecvError),
/// Unexpected response from the server
DowncastError,
}

impl<S: ConnectionErrors> fmt::Display for StreamingResponseItemError<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}

impl<S: ConnectionErrors> error::Error for StreamingResponseItemError<S> {}

/// Wrap a stream with an additional item that is kept alive until the stream is dropped
#[pin_project]
struct DeferDrop<S: Stream, X>(#[pin] S, X);
pub(crate) struct DeferDrop<S: Stream, X>(#[pin] pub S, pub X);

impl<S: Stream, X> Stream for DeferDrop<S, X> {
type Item = S::Item;
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ pub use server::RpcServer;
mod macros;
mod map;

pub mod pattern;

/// Requirements for a RPC message
///
/// Even when just using the mem transport, we require messages to be Serializable and Deserializable.
Expand Down
Loading
Loading