Skip to content

Commit

Permalink
Use updated Service trait (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Dec 4, 2024
1 parent 3c27991 commit 87b6161
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 147 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [3.3.0] - 2024-12-04

* Use updated Service trait

## [3.2.1] - 2024-12-03

* Handle unknown links
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "3.2.1"
version = "3.3.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -25,8 +25,8 @@ default = []
frame-trace = []

[dependencies]
ntex = "2.9"
ntex-util = "2.7"
ntex = "2.10"
ntex-io = "2.9.1"
ntex-amqp-codec = "0.9"

bitflags = "2"
Expand Down
219 changes: 75 additions & 144 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,25 @@
use std::task::{Context, Poll};
use std::{cell, cmp, future::poll_fn, future::Future, marker, pin::Pin, rc::Rc};
use std::{cell, cmp, future::poll_fn, future::Future, marker, pin::Pin};

use ntex::service::{Pipeline, PipelineBinding, PipelineCall, Service, ServiceCtx};
use ntex::time::{sleep, Millis, Sleep};
use ntex::util::{ready, select, Either};
use ntex::util::{ready, Either};
use ntex::{io::DispatchItem, rt::spawn, task::LocalWaker};

use crate::codec::{protocol::Frame, AmqpCodec, AmqpFrame};
use crate::error::{AmqpDispatcherError, AmqpProtocolError, Error};
use crate::{connection::Connection, types, ControlFrame, ControlFrameKind, ReceiverLink};

/// Amqp server dispatcher service.
pub(crate) struct Dispatcher<Sr: Service<types::Message>, Ctl: Service<ControlFrame>>(
Rc<DispatcherInner<Sr, Ctl>>,
);

struct DispatcherInner<Sr: Service<types::Message>, Ctl: Service<ControlFrame>> {
pub(crate) struct Dispatcher<Sr: Service<types::Message>, Ctl: Service<ControlFrame>> {
sink: Connection,
service: PipelineBinding<Sr, types::Message>,
ctl_service: PipelineBinding<Ctl, ControlFrame>,
ctl_fut: cell::RefCell<Vec<(ControlFrame, PipelineCall<Ctl, ControlFrame>)>>,
ctl_error: cell::Cell<Option<AmqpDispatcherError>>,
ctl_error_waker: LocalWaker,
idle_sleep: Sleep,
idle_timeout: Millis,
stopped: cell::Cell<bool>,
}

impl<Sr, Ctl> Dispatcher<Sr, Ctl>
Expand All @@ -39,25 +35,22 @@ where
idle_timeout: Millis,
) -> Self {
let idle_timeout = Millis(cmp::min(idle_timeout.0 >> 1, 1000));
let inner = Rc::new(DispatcherInner {
Dispatcher {
sink,
idle_timeout,
service: service.bind(),
ctl_service: ctl_service.bind(),
ctl_fut: cell::RefCell::new(Vec::new()),
ctl_error: cell::Cell::new(None),
ctl_error_waker: LocalWaker::default(),
idle_sleep: sleep(idle_timeout),
stopped: cell::Cell::new(false),
});

let disp = Dispatcher(inner);
disp.start_idle_timer();
disp.start_control_queue();
disp
}
}

fn call_control_service(&self, frame: ControlFrame) {
self.0.sink.get_control_queue().enqueue_frame(frame);
let fut = self.ctl_service.call(frame.clone());
self.ctl_fut.borrow_mut().push((frame, fut));
self.sink.get_control_queue().waker.wake();
}
}

Expand All @@ -72,32 +65,32 @@ where

async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
poll_fn(|cx| {
if let Some(err) = self.0.ctl_error.take() {
log::error!("{}: Control service failed: {:?}", self.0.sink.tag(), err);
let _ = self.0.sink.close();
if let Some(err) = self.ctl_error.take() {
log::error!("{}: Control service failed: {:?}", self.sink.tag(), err);
let _ = self.sink.close();
return Poll::Ready(Err(err));
}

// check readiness
let service_poll = self.0.service.poll_ready(cx).map_err(|err| {
let service_poll = self.service.poll_ready(cx).map_err(|err| {
let err = Error::from(err);
log::error!(
"{}: Publish service readiness check failed: {:?}",
self.0.sink.tag(),
self.sink.tag(),
err
);
let _ = self.0.sink.close_with_error(err);
let _ = self.sink.close_with_error(err);
AmqpDispatcherError::Service
})?;

let ctl_service_poll = self.0.ctl_service.poll_ready(cx).map_err(|err| {
let ctl_service_poll = self.ctl_service.poll_ready(cx).map_err(|err| {
let err = Error::from(err);
log::error!(
"{}: Control service readiness check failed: {:?}",
self.0.sink.tag(),
self.sink.tag(),
err
);
let _ = self.0.sink.close_with_error(err);
let _ = self.sink.close_with_error(err);
AmqpDispatcherError::Service
})?;

Expand All @@ -110,39 +103,69 @@ where
.await
}

async fn not_ready(&self) {
select(
select(
poll_fn(|cx| self.0.service.poll_not_ready(cx)),
poll_fn(|cx| self.0.ctl_service.poll_not_ready(cx)),
),
poll_fn(|cx| {
self.0.ctl_error_waker.register(cx.waker());
if let Some(err) = self.0.ctl_error.take() {
self.0.ctl_error.set(Some(err));
Poll::Ready(())
} else {
Poll::Pending
fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
let mut futs = self.ctl_fut.borrow_mut();
let queue = self.sink.get_control_queue();
queue.waker.register(cx.waker());

// enqueue pending control frames
queue.pending.borrow_mut().drain(..).for_each(|frame| {
let fut = self.ctl_service.call(frame.clone());
futs.push((frame, fut));
});

// process control frame
let mut idx = 0;
while futs.len() > idx {
let item = &mut futs[idx];
let res = match Pin::new(&mut item.1).poll(cx) {
Poll::Pending => {
idx += 1;
continue;
}
}),
)
.await;
Poll::Ready(res) => res,
};
let (frame, _) = futs.swap_remove(idx);
let result = match res {
Ok(_) => self.handle_control_frame(&frame, None),
Err(e) => self.handle_control_frame(&frame, Some(e.into())),
};

if let Err(err) = result {
self.ctl_error.set(Some(err));
self.ctl_error_waker.wake();
return Ok(());
}
}

// handle idle timeout
if self.idle_timeout.non_zero() {
if self.idle_sleep.poll_elapsed(cx).is_ready() {
log::trace!(
"{}: Send keep-alive ping, timeout: {:?} secs",
self.sink.tag(),
self.idle_timeout
);
self.sink.post_frame(AmqpFrame::new(0, Frame::Empty));
self.idle_sleep.reset(self.idle_timeout);
}
}

Check warning on line 152 in src/dispatcher.rs

View workflow job for this annotation

GitHub Actions / clippy

this `if` statement can be collapsed

warning: this `if` statement can be collapsed --> src/dispatcher.rs:142:9 | 142 | / if self.idle_timeout.non_zero() { 143 | | if self.idle_sleep.poll_elapsed(cx).is_ready() { 144 | | log::trace!( 145 | | "{}: Send keep-alive ping, timeout: {:?} secs", ... | 151 | | } 152 | | } | |_________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_if = note: `#[warn(clippy::collapsible_if)]` on by default help: collapse nested if block | 142 ~ if self.idle_timeout.non_zero() && self.idle_sleep.poll_elapsed(cx).is_ready() { 143 + log::trace!( 144 + "{}: Send keep-alive ping, timeout: {:?} secs", 145 + self.sink.tag(), 146 + self.idle_timeout 147 + ); 148 + self.sink.post_frame(AmqpFrame::new(0, Frame::Empty)); 149 + self.idle_sleep.reset(self.idle_timeout); 150 + } |

Ok(())
}

async fn shutdown(&self) {
self.0
.sink
self.sink
.0
.get_mut()
.set_error(AmqpProtocolError::Disconnected);
let _ = self
.0
.ctl_service
.call(ControlFrame::new_kind(ControlFrameKind::Closed))
.await;

self.0.service.shutdown().await;
self.0.ctl_service.shutdown().await;
self.service.shutdown().await;
self.ctl_service.shutdown().await;
}

async fn call(
Expand All @@ -153,10 +176,9 @@ where
match request {
DispatchItem::Item(frame) => {
#[cfg(feature = "frame-trace")]
log::trace!("{}: incoming: {:#?}", self.0.sink.tag(), frame);
log::trace!("{}: incoming: {:#?}", self.sink.tag(), frame);

let action = match self
.0
.sink
.handle_frame(frame)
.map_err(AmqpDispatcherError::Protocol)
Expand All @@ -167,10 +189,9 @@ where

match action {
types::Action::Transfer(link) => {
if self.0.sink.is_opened() {
if self.sink.is_opened() {
let lnk = link.clone();
if let Err(e) =
self.0.service.call(types::Message::Transfer(link)).await
if let Err(e) = self.service.call(types::Message::Transfer(link)).await
{
let e = Error::from(e);
log::trace!("Service error {:?}", e);
Expand Down Expand Up @@ -205,7 +226,7 @@ where
}
types::Action::DetachReceiver(link, frm) => {
let lnk = link.clone();
let fut = self.0.service.call(types::Message::Detached(lnk));
let fut = self.service.call(types::Message::Detached(lnk));
let _ = spawn(async move {
let _ = fut.await;
});
Expand All @@ -223,7 +244,7 @@ where
})
.collect();

let fut = self.0.service.call(types::Message::DetachedAll(receivers));
let fut = self.service.call(types::Message::DetachedAll(receivers));
let _ = spawn(async move {
let _ = fut.await;
});
Expand Down Expand Up @@ -271,96 +292,6 @@ where
}

impl<Sr, Ctl> Dispatcher<Sr, Ctl>
where
Sr: Service<types::Message, Response = ()> + 'static,
Ctl: Service<ControlFrame, Response = ()> + 'static,
Error: From<Sr::Error> + From<Ctl::Error>,
{
fn start_idle_timer(&self) {
if self.0.idle_timeout.non_zero() {
let slf = self.0.clone();
ntex::rt::spawn(async move {
poll_fn(|cx| slf.idle_sleep.poll_elapsed(cx)).await;
if slf.stopped.get() || !slf.sink.is_opened() {
return;
}
log::trace!(
"{}: Send keep-alive ping, timeout: {:?} secs",
slf.sink.tag(),
slf.idle_timeout
);
slf.sink.post_frame(AmqpFrame::new(0, Frame::Empty));
slf.idle_sleep.reset(slf.idle_timeout);
});
}
}

fn start_control_queue(&self) {
let slf = self.0.clone();
let queue = self.0.sink.get_control_queue().clone();
let on_close = self.0.sink.get_ref().on_close();
ntex::rt::spawn(async move {
let mut futs: Vec<(ControlFrame, PipelineCall<Ctl, ControlFrame>)> = Vec::new();
poll_fn(|cx| {
queue.waker.register(cx.waker());

// enqueue pending control frames
queue.pending.borrow_mut().drain(..).for_each(|frame| {
let fut = slf.ctl_service.call(frame.clone());
futs.push((frame, fut));
});

// process control frame
let mut idx = 0;
while futs.len() > idx {
let item = &mut futs[idx];
let res = match Pin::new(&mut item.1).poll(cx) {
Poll::Pending => {
idx += 1;
continue;
}
Poll::Ready(res) => res,
};
let (frame, _) = futs.swap_remove(idx);
let result = match res {
Ok(_) => slf.handle_control_frame(&frame, None),
Err(e) => slf.handle_control_frame(&frame, Some(e.into())),
};

if let Err(err) = result {
slf.ctl_error.set(Some(err));
slf.ctl_error_waker.wake();
return Poll::Ready(());
}
}

if !slf.sink.is_opened() {
let _ = on_close.poll_ready(cx);
}

if !futs.is_empty() || !slf.stopped.get() || slf.sink.is_opened() {
Poll::Pending
} else {
Poll::Ready(())
}
})
.await;
});
}
}

impl<Sr, Ctl> Drop for Dispatcher<Sr, Ctl>
where
Sr: Service<types::Message>,
Ctl: Service<ControlFrame>,
{
fn drop(&mut self) {
self.0.stopped.set(true);
self.0.idle_sleep.elapse();
}
}

impl<Sr, Ctl> DispatcherInner<Sr, Ctl>
where
Sr: Service<types::Message, Response = ()> + 'static,
Ctl: Service<ControlFrame, Response = ()> + 'static,
Expand Down

0 comments on commit 87b6161

Please sign in to comment.