Skip to content

Commit

Permalink
Merge pull request rust-lang#183 from dodomorandi/new_tokio
Browse files Browse the repository at this point in the history
Updated to new tokio API
  • Loading branch information
alexcrichton authored Jan 24, 2019
2 parents ab3d280 + 77e0c41 commit d06d479
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 24 deletions.
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ members = ['systest']
libc = "0.2"
miniz-sys = { path = "miniz-sys", version = "0.1.11", optional = true }
libz-sys = { version = "1.0", optional = true }
tokio-io = { version = "0.1", optional = true }
futures = { version = "0.1", optional = true }
tokio-io = { version = "0.1.11", optional = true }
futures = { version = "0.1.25", optional = true }
miniz_oxide_c_api = { version = "0.2", optional = true, features = ["no_c_export"]}
crc32fast = "1.1"

Expand All @@ -34,7 +34,9 @@ miniz_oxide_c_api = { version = "0.2", features = ["no_c_export"] }
[dev-dependencies]
rand = "0.6"
quickcheck = { version = "0.7", default-features = false }
tokio-core = "0.1"
tokio-io = "0.1.11"
tokio-tcp = "0.1.3"
tokio-threadpool = "0.1.10"

[features]
default = ["miniz-sys"]
Expand Down
4 changes: 2 additions & 2 deletions src/deflate/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl<W: Write> Write for DeflateEncoder<W> {
#[cfg(feature = "tokio")]
impl<W: AsyncWrite> AsyncWrite for DeflateEncoder<W> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
try_nb!(self.inner.finish());
self.inner.finish()?;
self.inner.get_mut().shutdown()
}
}
Expand Down Expand Up @@ -334,7 +334,7 @@ impl<W: Write> Write for DeflateDecoder<W> {
#[cfg(feature = "tokio")]
impl<W: AsyncWrite> AsyncWrite for DeflateDecoder<W> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
try_nb!(self.inner.finish());
self.inner.finish()?;
self.inner.get_mut().shutdown()
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/gz/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<W: Write> Write for GzEncoder<W> {
#[cfg(feature = "tokio")]
impl<W: AsyncWrite> AsyncWrite for GzEncoder<W> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
try_nb!(self.try_finish());
self.try_finish()?;
self.get_mut().shutdown()
}
}
Expand Down Expand Up @@ -388,7 +388,7 @@ impl<W: Write> Write for GzDecoder<W> {
#[cfg(feature = "tokio")]
impl<W: AsyncWrite> AsyncWrite for GzDecoder<W> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
try_nb!(self.try_finish());
self.try_finish()?;
self.inner.get_mut().get_mut().shutdown()
}
}
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ extern crate quickcheck;
#[cfg(test)]
extern crate rand;
#[cfg(feature = "tokio")]
#[macro_use]
extern crate tokio_io;

// These must currently agree with here --
Expand Down
4 changes: 2 additions & 2 deletions src/zlib/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl<W: Write> Write for ZlibEncoder<W> {
#[cfg(feature = "tokio")]
impl<W: AsyncWrite> AsyncWrite for ZlibEncoder<W> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
try_nb!(self.try_finish());
self.try_finish()?;
self.get_mut().shutdown()
}
}
Expand Down Expand Up @@ -333,7 +333,7 @@ impl<W: Write> Write for ZlibDecoder<W> {
#[cfg(feature = "tokio")]
impl<W: AsyncWrite> AsyncWrite for ZlibDecoder<W> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
try_nb!(self.inner.finish());
self.inner.finish()?;
self.inner.get_mut().shutdown()
}
}
Expand Down
30 changes: 16 additions & 14 deletions tests/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
extern crate flate2;
extern crate futures;
extern crate rand;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_tcp;
extern crate tokio_threadpool;

use std::io::{Read, Write};
use std::iter;
Expand All @@ -16,17 +17,15 @@ use flate2::write;
use flate2::Compression;
use futures::Future;
use rand::{thread_rng, Rng};
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use tokio_io::io::{copy, shutdown};
use tokio_io::AsyncRead;
use tokio_tcp::TcpStream;

#[test]
fn tcp_stream_echo_pattern() {
const N: u8 = 16;
const M: usize = 16 * 1024;

let mut core = Core::new().unwrap();
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let t = thread::spawn(move || {
Expand Down Expand Up @@ -56,8 +55,7 @@ fn tcp_stream_echo_pattern() {
t.join().unwrap();
});

let handle = core.handle();
let stream = TcpStream::connect(&addr, &handle);
let stream = TcpStream::connect(&addr);
let copy = stream
.and_then(|s| {
let (a, b) = s.split();
Expand All @@ -69,9 +67,12 @@ fn tcp_stream_echo_pattern() {
let (amt, _a, b) = result.unwrap();
assert_eq!(amt, (N as u64) * (M as u64));
shutdown(b).map(|_| ())
});
})
.map_err(|err| panic!("{}", err));

core.run(copy).unwrap();
let threadpool = tokio_threadpool::Builder::new().build();
threadpool.spawn(copy);
threadpool.shutdown().wait().unwrap();
t.join().unwrap();
}

Expand All @@ -81,7 +82,6 @@ fn echo_random() {
.take(1024 * 1024)
.map(|()| thread_rng().gen::<u8>())
.collect::<Vec<_>>();
let mut core = Core::new().unwrap();
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let v2 = v.clone();
Expand Down Expand Up @@ -111,21 +111,23 @@ fn echo_random() {
t.join().unwrap();
});

let handle = core.handle();
let stream = TcpStream::connect(&addr, &handle);
let stream = TcpStream::connect(&addr);
let copy = stream
.and_then(|s| {
let (a, b) = s.split();
let a = read::ZlibDecoder::new(a);
let b = write::DeflateEncoder::new(b, Compression::default());
copy(a, b)
})
.then(|result| {
.then(move |result| {
let (amt, _a, b) = result.unwrap();
assert_eq!(amt, v.len() as u64);
shutdown(b).map(|_| ())
});
})
.map_err(|err| panic!("{}", err));

core.run(copy).unwrap();
let threadpool = tokio_threadpool::Builder::new().build();
threadpool.spawn(copy);
threadpool.shutdown().wait().unwrap();
t.join().unwrap();
}

0 comments on commit d06d479

Please sign in to comment.