diff --git a/src/reactor/background.rs b/src/reactor/background.rs index 07129474..96392b96 100644 --- a/src/reactor/background.rs +++ b/src/reactor/background.rs @@ -1,6 +1,6 @@ use super::{Handle, Reactor}; -use futures::task::{AtomicWaker, LocalWaker}; +use futures::task::{AtomicWaker, Waker}; use futures::{executor, Future, Poll}; use log::debug; @@ -107,8 +107,8 @@ impl Drop for Background { impl Future for Shutdown { type Output = Result<(), ()>; - fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { - self.inner.shared.shutdown_task.register(lw); + fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { + self.inner.shared.shutdown_task.register(waker); if !self.inner.is_shutdown() { return Poll::Pending; diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index 06fe8e26..7c7ec1f4 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -22,7 +22,7 @@ use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use std::{fmt, usize}; -use futures::task::{AtomicWaker, LocalWaker}; +use futures::task::{AtomicWaker, Waker}; use log::{debug, log_enabled, trace, Level}; use mio::event::Evented; use slab::Slab; @@ -507,20 +507,20 @@ impl Inner { } /// Registers interest in the I/O resource associated with `token`. - fn register(&self, lw: &LocalWaker, token: usize, dir: Direction) { + fn register(&self, waker: &Waker, token: usize, dir: Direction) { debug!("scheduling direction for: {}", token); let io_dispatch = self.io_dispatch.read(); let sched = io_dispatch.get(token).unwrap(); - let (waker, ready) = match dir { + let (atomic_waker, ready) = match dir { Direction::Read => (&sched.reader, !mio::Ready::writable()), Direction::Write => (&sched.writer, mio::Ready::writable()), }; - waker.register(lw); + atomic_waker.register(waker); if sched.readiness.load(SeqCst) & ready.as_usize() != 0 { - waker.wake(); + atomic_waker.wake(); } } } diff --git a/src/reactor/poll_evented.rs b/src/reactor/poll_evented.rs index 7b09f451..807cabc8 100644 --- a/src/reactor/poll_evented.rs +++ b/src/reactor/poll_evented.rs @@ -1,7 +1,7 @@ use super::Registration; use futures::io::{AsyncRead, AsyncWrite}; -use futures::task::LocalWaker; +use futures::task::Waker; use futures::{ready, Poll}; use mio; use mio::event::Evented; @@ -160,7 +160,7 @@ where /// cleared by calling [`clear_read_ready`]. /// /// [`clear_read_ready`]: #method.clear_read_ready - pub fn poll_read_ready(&self, lw: &LocalWaker) -> Poll> { + pub fn poll_read_ready(&self, waker: &Waker) -> Poll> { self.register()?; // Load cached & encoded readiness. @@ -175,7 +175,7 @@ where // stream. This happens in a loop to ensure that the stream gets // drained. loop { - let ready = ready!(self.inner.registration.poll_read_ready(lw)?); + let ready = ready!(self.inner.registration.poll_read_ready(waker)?); cached |= ready.as_usize(); // Update the cache store @@ -207,14 +207,14 @@ where /// /// The `mask` argument specifies the readiness bits to clear. This may not /// include `writable` or `hup`. - pub fn clear_read_ready(&self, lw: &LocalWaker) -> io::Result<()> { + pub fn clear_read_ready(&self, waker: &Waker) -> io::Result<()> { self.inner .read_readiness .fetch_and(!mio::Ready::readable().as_usize(), Relaxed); - if self.poll_read_ready(lw)?.is_ready() { + if self.poll_read_ready(waker)?.is_ready() { // Notify the current task - lw.wake(); + waker.wake(); } Ok(()) @@ -239,7 +239,7 @@ where /// /// * `ready` contains bits besides `writable` and `hup`. /// * called from outside of a task context. - pub fn poll_write_ready(&self, lw: &LocalWaker) -> Poll> { + pub fn poll_write_ready(&self, waker: &Waker) -> Poll> { self.register()?; // Load cached & encoded readiness. @@ -254,7 +254,7 @@ where // stream. This happens in a loop to ensure that the stream gets // drained. loop { - let ready = ready!(self.inner.registration.poll_write_ready(lw)?); + let ready = ready!(self.inner.registration.poll_write_ready(waker)?); cached |= ready.as_usize(); // Update the cache store @@ -290,14 +290,14 @@ where /// # Panics /// /// This function will panic if called from outside of a task context. - pub fn clear_write_ready(&self, lw: &LocalWaker) -> io::Result<()> { + pub fn clear_write_ready(&self, waker: &Waker) -> io::Result<()> { self.inner .write_readiness .fetch_and(!mio::Ready::writable().as_usize(), Relaxed); - if self.poll_write_ready(lw)?.is_ready() { + if self.poll_write_ready(waker)?.is_ready() { // Notify the current task - lw.wake(); + waker.wake(); } Ok(()) @@ -318,13 +318,13 @@ impl AsyncRead for PollEvented where E: Evented + Read, { - fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll> { - ready!(self.poll_read_ready(lw)?); + fn poll_read(&mut self, waker: &Waker, buf: &mut [u8]) -> Poll> { + ready!(self.poll_read_ready(waker)?); let r = self.get_mut().read(buf); if is_wouldblock(&r) { - self.clear_read_ready(lw)?; + self.clear_read_ready(waker)?; Poll::Pending } else { Poll::Ready(r) @@ -336,33 +336,33 @@ impl AsyncWrite for PollEvented where E: Evented + Write, { - fn poll_write(&mut self, lw: &LocalWaker, buf: &[u8]) -> Poll> { - ready!(self.poll_write_ready(lw)?); + fn poll_write(&mut self, waker: &Waker, buf: &[u8]) -> Poll> { + ready!(self.poll_write_ready(waker)?); let r = self.get_mut().write(buf); if is_wouldblock(&r) { - self.clear_write_ready(lw)?; + self.clear_write_ready(waker)?; Poll::Pending } else { Poll::Ready(r) } } - fn poll_flush(&mut self, lw: &LocalWaker) -> Poll> { - ready!(self.poll_write_ready(lw)?); + fn poll_flush(&mut self, waker: &Waker) -> Poll> { + ready!(self.poll_write_ready(waker)?); let r = self.get_mut().flush(); if is_wouldblock(&r) { - self.clear_write_ready(lw)?; + self.clear_write_ready(waker)?; Poll::Pending } else { Poll::Ready(r) } } - fn poll_close(&mut self, _: &LocalWaker) -> Poll> { + fn poll_close(&mut self, _: &Waker) -> Poll> { Poll::Ready(Ok(())) } } @@ -374,13 +374,13 @@ where E: Evented, &'a E: Read, { - fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll> { - ready!(self.poll_read_ready(lw)?); + fn poll_read(&mut self, waker: &Waker, buf: &mut [u8]) -> Poll> { + ready!(self.poll_read_ready(waker)?); let r = self.get_ref().read(buf); if is_wouldblock(&r) { - self.clear_read_ready(lw)?; + self.clear_read_ready(waker)?; Poll::Pending } else { Poll::Ready(r) @@ -393,33 +393,33 @@ where E: Evented, &'a E: Write, { - fn poll_write(&mut self, lw: &LocalWaker, buf: &[u8]) -> Poll> { - ready!(self.poll_write_ready(lw)?); + fn poll_write(&mut self, waker: &Waker, buf: &[u8]) -> Poll> { + ready!(self.poll_write_ready(waker)?); let r = self.get_ref().write(buf); if is_wouldblock(&r) { - self.clear_write_ready(lw)?; + self.clear_write_ready(waker)?; Poll::Pending } else { Poll::Ready(r) } } - fn poll_flush(&mut self, lw: &LocalWaker) -> Poll> { - ready!(self.poll_write_ready(lw)?); + fn poll_flush(&mut self, waker: &Waker) -> Poll> { + ready!(self.poll_write_ready(waker)?); let r = self.get_ref().flush(); if is_wouldblock(&r) { - self.clear_write_ready(lw)?; + self.clear_write_ready(waker)?; Poll::Pending } else { Poll::Ready(r) } } - fn poll_close(&mut self, _: &LocalWaker) -> Poll> { + fn poll_close(&mut self, _: &Waker) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/src/reactor/registration.rs b/src/reactor/registration.rs index 5785a964..e82cbeb4 100644 --- a/src/reactor/registration.rs +++ b/src/reactor/registration.rs @@ -1,6 +1,6 @@ use super::{Direction, HandlePriv}; -use futures::task::LocalWaker; +use futures::task::Waker; use futures::Poll; use mio::{self, Evented}; @@ -65,7 +65,7 @@ struct Inner { #[derive(Debug)] struct Node { direction: Direction, - waker: *const LocalWaker, + waker: *const Waker, next: *mut Node, } @@ -248,8 +248,8 @@ impl Registration { /// # Panics /// /// This function will panic if called from outside of a task context. - pub fn poll_read_ready(&self, lw: &LocalWaker) -> Poll> { - match self.poll_ready(Some(lw), Direction::Read) { + pub fn poll_read_ready(&self, waker: &Waker) -> Poll> { + match self.poll_ready(Some(waker), Direction::Read) { Ok(Some(v)) => Poll::Ready(Ok(v)), Ok(None) => Poll::Pending, Err(e) => Poll::Ready(Err(e)), @@ -299,8 +299,8 @@ impl Registration { /// # Panics /// /// This function will panic if called from outside of a task context. - pub fn poll_write_ready(&self, lw: &LocalWaker) -> Poll> { - match self.poll_ready(Some(lw), Direction::Write) { + pub fn poll_write_ready(&self, waker: &Waker) -> Poll> { + match self.poll_ready(Some(waker), Direction::Write) { Ok(Some(v)) => Poll::Ready(Ok(v)), Ok(None) => Poll::Pending, Err(e) => Poll::Ready(Err(e)), @@ -320,7 +320,7 @@ impl Registration { fn poll_ready( &self, - lw: Option<&LocalWaker>, + waker: Option<&Waker>, direction: Direction, ) -> io::Result> { let mut state = self.state.load(SeqCst); @@ -339,23 +339,23 @@ impl Registration { } READY => { let inner = unsafe { (*self.inner.get()).as_ref().unwrap() }; - return inner.poll_ready(lw, direction); + return inner.poll_ready(waker, direction); } LOCKED => { - if lw.is_none() { + if waker.is_none() { // Skip the notification tracking junk. return Ok(None); } let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node; - let lw = lw.unwrap(); + let waker = waker.unwrap(); // Get the node let mut n = node.take().unwrap_or_else(|| { Box::new(Node { direction, - waker: lw as *const LocalWaker, + waker: waker as *const Waker, next: ptr::null_mut(), }) }); @@ -414,21 +414,21 @@ impl Inner { (inner, res) } - fn register(&self, lw: &LocalWaker, direction: Direction) { + fn register(&self, waker: &Waker, direction: Direction) { if self.token == ERROR { - lw.wake(); + waker.wake(); return; } let inner = match self.handle.inner() { Some(inner) => inner, None => { - lw.wake(); + waker.wake(); return; } }; - inner.register(lw, self.token, direction); + inner.register(waker, self.token, direction); } fn deregister(&self, io: &E) -> io::Result<()> { @@ -449,7 +449,7 @@ impl Inner { fn poll_ready( &self, - lw: Option<&LocalWaker>, + waker: Option<&Waker>, direction: Direction, ) -> io::Result> { if self.token == ERROR { @@ -481,12 +481,12 @@ impl Inner { let mut ready = mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst)); - if ready.is_empty() && lw.is_some() { - let lw = lw.unwrap(); + if ready.is_empty() && waker.is_some() { + let waker = waker.unwrap(); // Update the task info match direction { - Direction::Read => sched.reader.register(lw), - Direction::Write => sched.writer.register(lw), + Direction::Read => sched.reader.register(waker), + Direction::Write => sched.writer.register(waker), } // Try again diff --git a/src/tcp/listener.rs b/src/tcp/listener.rs index 04f49f6c..4f694cc4 100644 --- a/src/tcp/listener.rs +++ b/src/tcp/listener.rs @@ -6,7 +6,7 @@ use std::net::{self, SocketAddr}; use std::pin::Pin; use futures::stream::Stream; -use futures::task::LocalWaker; +use futures::task::Waker; use futures::{ready, Poll}; use mio; @@ -194,8 +194,8 @@ impl TcpListener { self.io.get_ref().set_ttl(ttl) } - fn poll_accept(&mut self, lw: &LocalWaker) -> Poll> { - let (io, addr) = ready!(self.poll_accept_std(lw)?); + fn poll_accept(&mut self, waker: &Waker) -> Poll> { + let (io, addr) = ready!(self.poll_accept_std(waker)?); let io = mio::net::TcpStream::from_stream(io)?; let io = TcpStream::new(io); @@ -205,14 +205,14 @@ impl TcpListener { fn poll_accept_std( &mut self, - lw: &LocalWaker, + waker: &Waker, ) -> Poll> { - ready!(self.io.poll_read_ready(lw)?); + ready!(self.io.poll_read_ready(waker)?); match self.io.get_ref().accept_std() { Ok(pair) => Poll::Ready(Ok(pair)), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(lw)?; + self.io.clear_read_ready(waker)?; Poll::Pending } Err(e) => Poll::Ready(Err(e)), @@ -249,8 +249,8 @@ pub struct Incoming<'a> { impl<'a> Stream for Incoming<'a> { type Item = io::Result; - fn poll_next(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll> { - let (socket, _) = ready!(self.inner.poll_accept(lw)?); + fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { + let (socket, _) = ready!(self.inner.poll_accept(waker)?); Poll::Ready(Some(Ok(socket))) } } diff --git a/src/tcp/stream.rs b/src/tcp/stream.rs index 6b88c9e9..e6df71ce 100644 --- a/src/tcp/stream.rs +++ b/src/tcp/stream.rs @@ -6,7 +6,7 @@ use std::pin::Pin; use std::time::Duration; use futures::io::{AsyncRead, AsyncWrite}; -use futures::task::LocalWaker; +use futures::task::Waker; use futures::{ready, Future, Poll}; use iovec::IoVec; use mio; @@ -90,8 +90,8 @@ impl TcpStream { /// /// Once the stream is ready for reading, it will remain so until all available /// bytes have been extracted (via `futures::io::AsyncRead` and related traits). - pub fn poll_read_ready(&self, lw: &LocalWaker) -> Poll> { - self.io.poll_read_ready(lw) + pub fn poll_read_ready(&self, waker: &Waker) -> Poll> { + self.io.poll_read_ready(waker) } /// Check the TCP stream's write readiness state. @@ -108,8 +108,8 @@ impl TcpStream { /// # Panics /// /// This function panics if called from outside of a task context. - pub fn poll_write_ready(&self, lw: &LocalWaker) -> Poll> { - self.io.poll_write_ready(lw) + pub fn poll_write_ready(&self, waker: &Waker) -> Poll> { + self.io.poll_write_ready(waker) } /// Returns the local address that this stream is bound to. @@ -482,55 +482,55 @@ impl TcpStream { // ===== impl Read / Write ===== impl AsyncRead for TcpStream { - fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll> { - <&TcpStream>::poll_read(&mut &*self, lw, buf) + fn poll_read(&mut self, waker: &Waker, buf: &mut [u8]) -> Poll> { + <&TcpStream>::poll_read(&mut &*self, waker, buf) } fn poll_vectored_read( &mut self, - lw: &LocalWaker, + waker: &Waker, vec: &mut [&mut IoVec], ) -> Poll> { - <&TcpStream>::poll_vectored_read(&mut &*self, lw, vec) + <&TcpStream>::poll_vectored_read(&mut &*self, waker, vec) } } impl AsyncWrite for TcpStream { - fn poll_write(&mut self, lw: &LocalWaker, buf: &[u8]) -> Poll> { - <&TcpStream>::poll_write(&mut &*self, lw, buf) + fn poll_write(&mut self, waker: &Waker, buf: &[u8]) -> Poll> { + <&TcpStream>::poll_write(&mut &*self, waker, buf) } - fn poll_vectored_write(&mut self, lw: &LocalWaker, vec: &[&IoVec]) -> Poll> { - <&TcpStream>::poll_vectored_write(&mut &*self, lw, vec) + fn poll_vectored_write(&mut self, waker: &Waker, vec: &[&IoVec]) -> Poll> { + <&TcpStream>::poll_vectored_write(&mut &*self, waker, vec) } - fn poll_flush(&mut self, lw: &LocalWaker) -> Poll> { - <&TcpStream>::poll_flush(&mut &*self, lw) + fn poll_flush(&mut self, waker: &Waker) -> Poll> { + <&TcpStream>::poll_flush(&mut &*self, waker) } - fn poll_close(&mut self, lw: &LocalWaker) -> Poll> { - <&TcpStream>::poll_close(&mut &*self, lw) + fn poll_close(&mut self, waker: &Waker) -> Poll> { + <&TcpStream>::poll_close(&mut &*self, waker) } } // ===== impl Read / Write for &'a ===== impl<'a> AsyncRead for &'a TcpStream { - fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll> { - (&self.io).poll_read(lw, buf) + fn poll_read(&mut self, waker: &Waker, buf: &mut [u8]) -> Poll> { + (&self.io).poll_read(waker, buf) } fn poll_vectored_read( &mut self, - lw: &LocalWaker, + waker: &Waker, bufs: &mut [&mut IoVec], ) -> Poll> { - ready!(self.poll_read_ready(lw)?); + ready!(self.poll_read_ready(waker)?); let r = self.io.get_ref().read_bufs(bufs); if is_wouldblock(&r) { - self.io.clear_read_ready(lw)?; + self.io.clear_read_ready(waker)?; Poll::Pending } else { Poll::Ready(r) @@ -539,28 +539,28 @@ impl<'a> AsyncRead for &'a TcpStream { } impl<'a> AsyncWrite for &'a TcpStream { - fn poll_write(&mut self, lw: &LocalWaker, buf: &[u8]) -> Poll> { - (&self.io).poll_write(lw, buf) + fn poll_write(&mut self, waker: &Waker, buf: &[u8]) -> Poll> { + (&self.io).poll_write(waker, buf) } - fn poll_vectored_write(&mut self, lw: &LocalWaker, bufs: &[&IoVec]) -> Poll> { - ready!(self.poll_write_ready(lw)?); + fn poll_vectored_write(&mut self, waker: &Waker, bufs: &[&IoVec]) -> Poll> { + ready!(self.poll_write_ready(waker)?); let r = self.io.get_ref().write_bufs(bufs); if is_wouldblock(&r) { - self.io.clear_write_ready(lw)?; + self.io.clear_write_ready(waker)?; } return Poll::Ready(r); } - fn poll_flush(&mut self, lw: &LocalWaker) -> Poll> { - (&self.io).poll_flush(lw) + fn poll_flush(&mut self, waker: &Waker) -> Poll> { + (&self.io).poll_flush(waker) } - fn poll_close(&mut self, lw: &LocalWaker) -> Poll> { - (&self.io).poll_close(lw) + fn poll_close(&mut self, waker: &Waker) -> Poll> { + (&self.io).poll_close(waker) } } @@ -573,8 +573,8 @@ impl fmt::Debug for TcpStream { impl Future for ConnectFuture { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll> { - Pin::new(&mut self.inner).poll(lw) + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { + Pin::new(&mut self.inner).poll(waker) } } @@ -621,8 +621,8 @@ impl ConnectFutureState { impl Future for ConnectFutureState { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll> { - self.poll_inner(|io| io.poll_write_ready(lw)) + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { + self.poll_inner(|io| io.poll_write_ready(waker)) } } diff --git a/src/udp.rs b/src/udp.rs index 26d3faec..fd3977d6 100644 --- a/src/udp.rs +++ b/src/udp.rs @@ -16,7 +16,7 @@ use std::io; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::pin::Pin; -use futures::task::LocalWaker; +use futures::task::Waker; use futures::Future; use futures::{ready, Poll}; use mio; @@ -92,7 +92,7 @@ impl UdpSocket { /// If you prick us, do we not bleed? /// If you tickle us, do we not laugh? /// If you poison us, do we not die? - /// And if you wrong us, shall we not revenge? + /// And if you wrong us, shall we not revenge? /// "; /// /// # async fn send_data() -> Result<(), Box> { @@ -146,16 +146,16 @@ impl UdpSocket { /// notification when the socket becomes writable. pub fn poll_send_to( &mut self, - lw: &LocalWaker, + waker: &Waker, buf: &[u8], target: &SocketAddr, ) -> Poll> { - ready!(self.io.poll_write_ready(lw)?); + ready!(self.io.poll_write_ready(waker)?); match self.io.get_ref().send_to(buf, target) { Ok(n) => Poll::Ready(Ok(n)), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(lw)?; + self.io.clear_write_ready(waker)?; Poll::Pending } Err(e) => Poll::Ready(Err(e)), @@ -168,15 +168,15 @@ impl UdpSocket { /// notification when the socket becomes readable. pub fn poll_recv_from( &mut self, - lw: &LocalWaker, + waker: &Waker, buf: &mut [u8], ) -> Poll> { - ready!(self.io.poll_read_ready(lw)?); + ready!(self.io.poll_read_ready(waker)?); match self.io.get_ref().recv_from(buf) { Ok(n) => Poll::Ready(Ok(n)), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(lw)?; + self.io.clear_read_ready(waker)?; Poll::Pending } Err(e) => Poll::Ready(Err(e)), @@ -190,8 +190,8 @@ impl UdpSocket { /// /// The socket will remain in a read-ready state until calls to `poll_recv` /// return `Pending`. - pub fn poll_read_ready(&self, lw: &LocalWaker) -> Poll> { - self.io.poll_read_ready(lw) + pub fn poll_read_ready(&self, waker: &Waker) -> Poll> { + self.io.poll_read_ready(waker) } /// Check the UDP socket's write readiness state. @@ -201,8 +201,8 @@ impl UdpSocket { /// /// The I/O resource will remain in a write-ready state until calls to /// `poll_send` return `Pending`. - pub fn poll_write_ready(&self, lw: &LocalWaker) -> Poll> { - self.io.poll_write_ready(lw) + pub fn poll_write_ready(&self, waker: &Waker) -> Poll> { + self.io.poll_write_ready(waker) } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -399,9 +399,9 @@ pub struct SendTo<'a, 'b> { impl<'a, 'b> Future for SendTo<'a, 'b> { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { let SendTo { socket, buf, target } = &mut *self; - socket.poll_send_to(lw, buf, target) + socket.poll_send_to(waker, buf, target) } } @@ -415,8 +415,8 @@ pub struct RecvFrom<'a, 'b> { impl<'a, 'b> Future for RecvFrom<'a, 'b> { type Output = io::Result<(usize, SocketAddr)>; - fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { let RecvFrom { socket, buf } = &mut *self; - socket.poll_recv_from(lw, buf) + socket.poll_recv_from(waker, buf) } } diff --git a/src/uds/datagram.rs b/src/uds/datagram.rs index 0f4f6a8f..72d50dc3 100644 --- a/src/uds/datagram.rs +++ b/src/uds/datagram.rs @@ -1,6 +1,6 @@ use crate::reactor::PollEvented; -use futures::task::LocalWaker; +use futures::task::Waker; use futures::{ready, Poll}; use mio::Ready; use mio_uds; @@ -79,13 +79,13 @@ impl UnixDatagram { } /// Test whether this socket is ready to be read or not. - pub fn poll_read_ready(&self, lw: &LocalWaker) -> Poll> { - self.io.poll_read_ready(lw) + pub fn poll_read_ready(&self, waker: &Waker) -> Poll> { + self.io.poll_read_ready(waker) } /// Test whether this socket is ready to be written to or not. - pub fn poll_write_ready(&self, lw: &LocalWaker) -> Poll> { - self.io.poll_write_ready(lw) + pub fn poll_write_ready(&self, waker: &Waker) -> Poll> { + self.io.poll_write_ready(waker) } /// Returns the local address that this socket is bound to. @@ -127,15 +127,15 @@ impl UnixDatagram { /// whence the data came. pub fn poll_recv_from( &self, - lw: &LocalWaker, + waker: &Waker, buf: &mut [u8], ) -> Poll> { - ready!(self.io.poll_read_ready(lw)?); + ready!(self.io.poll_read_ready(waker)?); let r = self.io.get_ref().recv_from(buf); if is_wouldblock(&r) { - self.io.clear_read_ready(lw)?; + self.io.clear_read_ready(waker)?; Poll::Pending } else { Poll::Ready(r) @@ -147,16 +147,16 @@ impl UnixDatagram { /// On success, returns the number of bytes written. pub fn poll_send_to( &self, - lw: &LocalWaker, + waker: &Waker, buf: &[u8], path: impl AsRef, ) -> Poll> { - ready!(self.io.poll_write_ready(lw)?); + ready!(self.io.poll_write_ready(waker)?); let r = self.io.get_ref().send_to(buf, path); if is_wouldblock(&r) { - self.io.clear_write_ready(lw)?; + self.io.clear_write_ready(waker)?; Poll::Pending } else { Poll::Ready(r) diff --git a/src/uds/listener.rs b/src/uds/listener.rs index a69bca39..dabe94d2 100644 --- a/src/uds/listener.rs +++ b/src/uds/listener.rs @@ -2,7 +2,7 @@ use super::UnixStream; use crate::reactor::PollEvented; -use futures::task::LocalWaker; +use futures::task::Waker; use futures::{ready, Poll, Stream}; use mio_uds; @@ -128,24 +128,24 @@ impl UnixListener { Incoming::new(self) } - fn poll_accept(&self, lw: &LocalWaker) -> Poll> { - let (io, addr) = ready!(self.poll_accept_std(lw)?); + fn poll_accept(&self, waker: &Waker) -> Poll> { + let (io, addr) = ready!(self.poll_accept_std(waker)?); let io = mio_uds::UnixStream::from_stream(io)?; Poll::Ready(Ok((UnixStream::new(io), addr))) } - fn poll_accept_std(&self, lw: &LocalWaker) -> Poll> { - ready!(self.io.poll_read_ready(lw)?); + fn poll_accept_std(&self, waker: &Waker) -> Poll> { + ready!(self.io.poll_read_ready(waker)?); match self.io.get_ref().accept_std() { Ok(Some((sock, addr))) => Poll::Ready(Ok((sock, addr))), Ok(None) => { - self.io.clear_read_ready(lw)?; + self.io.clear_read_ready(waker)?; Poll::Pending } Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(lw)?; + self.io.clear_read_ready(waker)?; Poll::Pending } Err(err) => Poll::Ready(Err(err)), @@ -180,8 +180,8 @@ impl Incoming { impl Stream for Incoming { type Item = io::Result; - fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll> { - let (socket, _) = ready!(self.inner.poll_accept(lw)?); + fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll> { + let (socket, _) = ready!(self.inner.poll_accept(waker)?); Poll::Ready(Some(Ok(socket))) } } diff --git a/src/uds/stream.rs b/src/uds/stream.rs index fbf5a77c..97b34c6e 100644 --- a/src/uds/stream.rs +++ b/src/uds/stream.rs @@ -3,7 +3,7 @@ use super::ucred::{self, UCred}; use crate::reactor::PollEvented; use futures::io::{AsyncRead, AsyncWrite}; -use futures::task::LocalWaker; +use futures::task::Waker; use futures::{ready, Future, Poll}; use iovec::IoVec; use mio::Ready; @@ -97,13 +97,13 @@ impl UnixStream { } /// Test whether this socket is ready to be read or not. - pub fn poll_read_ready(&self, lw: &LocalWaker) -> Poll> { - self.io.poll_read_ready(lw) + pub fn poll_read_ready(&self, waker: &Waker) -> Poll> { + self.io.poll_read_ready(waker) } /// Test whether this socket is ready to be written to or not. - pub fn poll_write_ready(&self, lw: &LocalWaker) -> Poll> { - self.io.poll_write_ready(lw) + pub fn poll_write_ready(&self, waker: &Waker) -> Poll> { + self.io.poll_write_ready(waker) } /// Returns the socket address of the local half of this connection. @@ -198,53 +198,53 @@ impl UnixStream { } impl AsyncRead for UnixStream { - fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll> { - <&UnixStream>::poll_read(&mut &*self, lw, buf) + fn poll_read(&mut self, waker: &Waker, buf: &mut [u8]) -> Poll> { + <&UnixStream>::poll_read(&mut &*self, waker, buf) } fn poll_vectored_read( &mut self, - lw: &LocalWaker, + waker: &Waker, vec: &mut [&mut IoVec], ) -> Poll> { - <&UnixStream>::poll_vectored_read(&mut &*self, lw, vec) + <&UnixStream>::poll_vectored_read(&mut &*self, waker, vec) } } impl AsyncWrite for UnixStream { - fn poll_write(&mut self, lw: &LocalWaker, buf: &[u8]) -> Poll> { - <&UnixStream>::poll_write(&mut &*self, lw, buf) + fn poll_write(&mut self, waker: &Waker, buf: &[u8]) -> Poll> { + <&UnixStream>::poll_write(&mut &*self, waker, buf) } - fn poll_vectored_write(&mut self, lw: &LocalWaker, vec: &[&IoVec]) -> Poll> { - <&UnixStream>::poll_vectored_write(&mut &*self, lw, vec) + fn poll_vectored_write(&mut self, waker: &Waker, vec: &[&IoVec]) -> Poll> { + <&UnixStream>::poll_vectored_write(&mut &*self, waker, vec) } - fn poll_flush(&mut self, lw: &LocalWaker) -> Poll> { - <&UnixStream>::poll_flush(&mut &*self, lw) + fn poll_flush(&mut self, waker: &Waker) -> Poll> { + <&UnixStream>::poll_flush(&mut &*self, waker) } - fn poll_close(&mut self, lw: &LocalWaker) -> Poll> { - <&UnixStream>::poll_close(&mut &*self, lw) + fn poll_close(&mut self, waker: &Waker) -> Poll> { + <&UnixStream>::poll_close(&mut &*self, waker) } } impl<'a> AsyncRead for &'a UnixStream { - fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll> { - (&self.io).poll_read(lw, buf) + fn poll_read(&mut self, waker: &Waker, buf: &mut [u8]) -> Poll> { + (&self.io).poll_read(waker, buf) } fn poll_vectored_read( &mut self, - lw: &LocalWaker, + waker: &Waker, bufs: &mut [&mut IoVec], ) -> Poll> { - ready!(self.poll_read_ready(lw)?); + ready!(self.poll_read_ready(waker)?); let r = self.io.get_ref().read_bufs(bufs); if is_wouldblock(&r) { - self.io.clear_read_ready(lw)?; + self.io.clear_read_ready(waker)?; Poll::Pending } else { Poll::Ready(r) @@ -253,28 +253,28 @@ impl<'a> AsyncRead for &'a UnixStream { } impl<'a> AsyncWrite for &'a UnixStream { - fn poll_write(&mut self, lw: &LocalWaker, buf: &[u8]) -> Poll> { - (&self.io).poll_write(lw, buf) + fn poll_write(&mut self, waker: &Waker, buf: &[u8]) -> Poll> { + (&self.io).poll_write(waker, buf) } - fn poll_vectored_write(&mut self, lw: &LocalWaker, bufs: &[&IoVec]) -> Poll> { - ready!(self.poll_write_ready(lw)?); + fn poll_vectored_write(&mut self, waker: &Waker, bufs: &[&IoVec]) -> Poll> { + ready!(self.poll_write_ready(waker)?); let r = self.io.get_ref().write_bufs(bufs); if is_wouldblock(&r) { - self.io.clear_write_ready(lw)?; + self.io.clear_write_ready(waker)?; } return Poll::Ready(r); } - fn poll_flush(&mut self, lw: &LocalWaker) -> Poll> { - (&self.io).poll_flush(lw) + fn poll_flush(&mut self, waker: &Waker) -> Poll> { + (&self.io).poll_flush(waker) } - fn poll_close(&mut self, lw: &LocalWaker) -> Poll> { - (&self.io).poll_close(lw) + fn poll_close(&mut self, waker: &Waker) -> Poll> { + (&self.io).poll_close(waker) } } @@ -293,12 +293,12 @@ impl AsRawFd for UnixStream { impl Future for ConnectFuture { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll> { + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { use std::mem; match self.inner { State::Waiting(ref mut stream) => { - ready!(stream.io.poll_write_ready(lw)?); + ready!(stream.io.poll_write_ready(waker)?); if let Some(e) = stream.io.get_ref().take_error()? { return Poll::Ready(Err(e)); diff --git a/tests/tcp.rs b/tests/tcp.rs index 8b12b60d..37ef8e1c 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -1,4 +1,4 @@ -#![feature(async_await, await_macro)] +#![feature(async_await, await_macro, futures_api)] use std::io::{Read, Write}; use std::net::TcpStream; use std::thread; diff --git a/tests/uds.rs b/tests/uds.rs index 59028490..8f72fe49 100644 --- a/tests/uds.rs +++ b/tests/uds.rs @@ -4,7 +4,7 @@ use std::io::{Read, Write}; use std::os::unix::net::UnixStream as StdStream; use std::thread; -use std::task::LocalWaker; +use std::task::Waker; use std::pin::Pin; use futures::future::{FutureObj, self}; use futures::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; @@ -148,9 +148,9 @@ impl RomioReader { impl Stream for RomioReader { type Item = Vec; - fn poll_next(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { let this = &mut *self; - match this.inner.poll_read(lw, this.buffer.as_mut()) { + match this.inner.poll_read(waker, this.buffer.as_mut()) { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => { error!("Failed to poll_read {:?}", e);