Skip to content

Commit

Permalink
feat(lib): Upgrade to Tokio 1.0 (#2369)
Browse files Browse the repository at this point in the history
Closes #2370
  • Loading branch information
seanmonstar authored Dec 23, 2020
1 parent dad5c87 commit fad42ac
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 146 deletions.
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ include = [
]

[dependencies]
bytes = "0.6"
bytes = "1"
futures-core = { version = "0.3", default-features = false }
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false }
http = "0.2"
http-body = { git = "/~https://github.com/hyperium/http-body" }
http-body = "0.4"
httpdate = "0.3"
httparse = "1.0"
h2 = { git = "/~https://github.com/hyperium/h2", optional = true }
h2 = { version = "0.3", optional = true }
itoa = "0.4.1"
tracing = { version = "0.1", default-features = false, features = ["std"] }
pin-project = "1.0"
tower-service = "0.3"
tokio = { version = "0.3.4", features = ["sync", "stream"] }
tokio = { version = "1", features = ["sync"] }
want = "0.3"

# Optional
Expand All @@ -51,7 +51,7 @@ spmc = "0.3"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
tokio = { version = "0.3", features = [
tokio = { version = "1", features = [
"fs",
"macros",
"io-std",
Expand All @@ -62,8 +62,8 @@ tokio = { version = "0.3", features = [
"time",
"test-util",
] }
tokio-test = "0.3"
tokio-util = { version = "0.5", features = ["codec"] }
tokio-test = "0.4"
tokio-util = { version = "0.6", features = ["codec"] }
tower-util = "0.3"
url = "1.0"

Expand Down
2 changes: 1 addition & 1 deletion src/body/to_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ where
let second = if let Some(buf) = body.data().await {
buf?
} else {
return Ok(first.copy_to_bytes(first.bytes().len()));
return Ok(first.copy_to_bytes(first.remaining()));
};

// With more than 1 buf, we gotta flatten into a Vec first.
Expand Down
5 changes: 4 additions & 1 deletion src/client/connect/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,11 @@ impl ConnectingTcp<'_> {
let fallback_fut = fallback.remote.connect(self.config);
futures_util::pin_mut!(fallback_fut);

let fallback_delay = fallback.delay;
futures_util::pin_mut!(fallback_delay);

let (result, future) =
match futures_util::future::select(preferred_fut, fallback.delay).await {
match futures_util::future::select(preferred_fut, fallback_delay).await {
Either::Left((result, _fallback_delay)) => {
(result, Either::Right(fallback_fut))
}
Expand Down
12 changes: 6 additions & 6 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(feature = "http2")]
use std::future::Future;

use tokio::stream::Stream;
use futures_util::FutureExt;
use tokio::sync::{mpsc, oneshot};

use crate::common::{task, Pin, Poll};
Expand Down Expand Up @@ -150,8 +150,8 @@ impl<T, U> Receiver<T, U> {
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<(T, Callback<T, U>)>> {
let this = self.project();
match this.inner.poll_next(cx) {
let mut this = self.project();
match this.inner.poll_recv(cx) {
Poll::Ready(item) => {
Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
}
Expand All @@ -170,9 +170,9 @@ impl<T, U> Receiver<T, U> {

#[cfg(feature = "http1")]
pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
match self.inner.try_recv() {
Ok(mut env) => env.0.take(),
Err(_) => None,
match self.inner.recv().now_or_never() {
Some(Some(mut env)) => env.0.take(),
_ => None,
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,6 @@ impl<T: Poolable + 'static> Future for IdleTask<T> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
use tokio::stream::Stream;
let mut this = self.project();
loop {
match this.pool_drop_notifier.as_mut().poll(cx) {
Expand All @@ -743,7 +742,7 @@ impl<T: Poolable + 'static> Future for IdleTask<T> {
}
}

ready!(this.interval.as_mut().poll_next(cx));
ready!(this.interval.as_mut().poll_tick(cx));

if let Some(inner) = this.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
Expand Down
8 changes: 4 additions & 4 deletions src/common/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl<T: Buf> Buf for BufList<T> {
}

#[inline]
fn bytes(&self) -> &[u8] {
self.bufs.front().map(Buf::bytes).unwrap_or_default()
fn chunk(&self) -> &[u8] {
self.bufs.front().map(Buf::chunk).unwrap_or_default()
}

#[inline]
Expand All @@ -57,13 +57,13 @@ impl<T: Buf> Buf for BufList<T> {
}

#[inline]
fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
if dst.is_empty() {
return 0;
}
let mut vecs = 0;
for buf in &self.bufs {
vecs += buf.bytes_vectored(&mut dst[vecs..]);
vecs += buf.chunks_vectored(&mut dst[vecs..]);
if vecs == dst.len() {
break;
}
Expand Down
1 change: 0 additions & 1 deletion src/common/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
mod rewind;

pub(crate) use self::rewind::Rewind;
pub(crate) const MAX_WRITEV_BUFS: usize = 64;
22 changes: 11 additions & 11 deletions src/proto/h1/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ where
}

#[inline]
fn bytes(&self) -> &[u8] {
fn chunk(&self) -> &[u8] {
match self.kind {
BufKind::Exact(ref b) => b.bytes(),
BufKind::Limited(ref b) => b.bytes(),
BufKind::Chunked(ref b) => b.bytes(),
BufKind::ChunkedEnd(ref b) => b.bytes(),
BufKind::Exact(ref b) => b.chunk(),
BufKind::Limited(ref b) => b.chunk(),
BufKind::Chunked(ref b) => b.chunk(),
BufKind::ChunkedEnd(ref b) => b.chunk(),
}
}

Expand All @@ -249,12 +249,12 @@ where
}

#[inline]
fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
match self.kind {
BufKind::Exact(ref b) => b.bytes_vectored(dst),
BufKind::Limited(ref b) => b.bytes_vectored(dst),
BufKind::Chunked(ref b) => b.bytes_vectored(dst),
BufKind::ChunkedEnd(ref b) => b.bytes_vectored(dst),
BufKind::Exact(ref b) => b.chunks_vectored(dst),
BufKind::Limited(ref b) => b.chunks_vectored(dst),
BufKind::Chunked(ref b) => b.chunks_vectored(dst),
BufKind::ChunkedEnd(ref b) => b.chunks_vectored(dst),
}
}
}
Expand Down Expand Up @@ -295,7 +295,7 @@ impl Buf for ChunkSize {
}

#[inline]
fn bytes(&self) -> &[u8] {
fn chunk(&self) -> &[u8] {
&self.bytes[self.pos.into()..self.len.into()]
}

Expand Down
25 changes: 13 additions & 12 deletions src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ where
self.read_buf.reserve(next);
}

let dst = self.read_buf.bytes_mut();
let dst = self.read_buf.chunk_mut();
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
let mut buf = ReadBuf::uninit(dst);
match Pin::new(&mut self.io).poll_read(cx, &mut buf) {
Expand Down Expand Up @@ -231,10 +231,11 @@ where
return self.poll_flush_flattened(cx);
}

const MAX_WRITEV_BUFS: usize = 64;
loop {
let n = {
let mut iovs = [IoSlice::new(&[]); crate::common::io::MAX_WRITEV_BUFS];
let len = self.write_buf.bytes_vectored(&mut iovs);
let mut iovs = [IoSlice::new(&[]); MAX_WRITEV_BUFS];
let len = self.write_buf.chunks_vectored(&mut iovs);
ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))?
};
// TODO(eliza): we have to do this manually because
Expand Down Expand Up @@ -262,7 +263,7 @@ where
/// that skips some bookkeeping around using multiple buffers.
fn poll_flush_flattened(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
loop {
let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.bytes()))?;
let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?;
debug!("flushed {} bytes", n);
self.write_buf.headers.advance(n);
if self.write_buf.headers.remaining() == 0 {
Expand Down Expand Up @@ -433,7 +434,7 @@ impl<T: AsRef<[u8]>> Buf for Cursor<T> {
}

#[inline]
fn bytes(&self) -> &[u8] {
fn chunk(&self) -> &[u8] {
&self.bytes.as_ref()[self.pos..]
}

Expand Down Expand Up @@ -487,7 +488,7 @@ where
//but accomplishes the same result.
loop {
let adv = {
let slice = buf.bytes();
let slice = buf.chunk();
if slice.is_empty() {
return;
}
Expand Down Expand Up @@ -534,12 +535,12 @@ impl<B: Buf> Buf for WriteBuf<B> {
}

#[inline]
fn bytes(&self) -> &[u8] {
let headers = self.headers.bytes();
fn chunk(&self) -> &[u8] {
let headers = self.headers.chunk();
if !headers.is_empty() {
headers
} else {
self.queue.bytes()
self.queue.chunk()
}
}

Expand All @@ -559,9 +560,9 @@ impl<B: Buf> Buf for WriteBuf<B> {
}

#[inline]
fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
let n = self.headers.bytes_vectored(dst);
self.queue.bytes_vectored(&mut dst[n..]) + n
fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
let n = self.headers.chunks_vectored(dst);
self.queue.chunks_vectored(&mut dst[n..]) + n
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/proto/h2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ impl<B: Buf> Buf for SendBuf<B> {
}

#[inline]
fn bytes(&self) -> &[u8] {
self.0.as_ref().map(|b| b.bytes()).unwrap_or(&[])
fn chunk(&self) -> &[u8] {
self.0.as_ref().map(|b| b.chunk()).unwrap_or(&[])
}

#[inline]
Expand All @@ -268,7 +268,7 @@ impl<B: Buf> Buf for SendBuf<B> {
}
}

fn bytes_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
self.0.as_ref().map(|b| b.bytes_vectored(dst)).unwrap_or(0)
fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
self.0.as_ref().map(|b| b.chunks_vectored(dst)).unwrap_or(0)
}
}
10 changes: 5 additions & 5 deletions src/proto/h2/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger)
interval,
timeout: config.keep_alive_timeout,
while_idle: config.keep_alive_while_idle,
timer: tokio::time::sleep(interval),
timer: Box::pin(tokio::time::sleep(interval)),
state: KeepAliveState::Init,
});

Expand Down Expand Up @@ -156,7 +156,7 @@ struct KeepAlive {
while_idle: bool,

state: KeepAliveState,
timer: Sleep,
timer: Pin<Box<Sleep>>,
}

#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -441,7 +441,7 @@ impl KeepAlive {

self.state = KeepAliveState::Scheduled;
let interval = shared.last_read_at() + self.interval;
self.timer.reset(interval);
self.timer.as_mut().reset(interval);
}
KeepAliveState::PingSent => {
if shared.is_ping_sent() {
Expand All @@ -450,7 +450,7 @@ impl KeepAlive {

self.state = KeepAliveState::Scheduled;
let interval = shared.last_read_at() + self.interval;
self.timer.reset(interval);
self.timer.as_mut().reset(interval);
}
KeepAliveState::Scheduled => (),
}
Expand All @@ -472,7 +472,7 @@ impl KeepAlive {
shared.send_ping();
self.state = KeepAliveState::PingSent;
let timeout = Instant::now() + self.timeout;
self.timer.reset(timeout);
self.timer.as_mut().reset(timeout);
}
KeepAliveState::Init | KeepAliveState::PingSent => (),
}
Expand Down
8 changes: 4 additions & 4 deletions src/server/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct AddrIncoming {
sleep_on_errors: bool,
tcp_keepalive_timeout: Option<Duration>,
tcp_nodelay: bool,
timeout: Option<Sleep>,
timeout: Option<Pin<Box<Sleep>>>,
}

impl AddrIncoming {
Expand Down Expand Up @@ -160,9 +160,9 @@ impl AddrIncoming {
error!("accept error: {}", e);

// Sleep 1s.
let mut timeout = tokio::time::sleep(Duration::from_secs(1));
let mut timeout = Box::pin(tokio::time::sleep(Duration::from_secs(1)));

match Pin::new(&mut timeout).poll(cx) {
match timeout.as_mut().poll(cx) {
Poll::Ready(()) => {
// Wow, it's been a second already? Ok then...
continue;
Expand Down Expand Up @@ -263,7 +263,7 @@ mod addr_stream {
pub fn poll_peek(
&mut self,
cx: &mut task::Context<'_>,
buf: &mut [u8],
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<usize>> {
self.inner.poll_peek(cx, buf)
}
Expand Down
Loading

0 comments on commit fad42ac

Please sign in to comment.