Skip to content
This repository has been archived by the owner on Nov 7, 2019. It is now read-only.

Update futures-preview to 0.3.0-alpha.13 #66

Merged
merged 1 commit into from
Feb 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/reactor/background.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{Handle, Reactor};

use futures::task::{AtomicWaker, LocalWaker};
use futures::task::{AtomicWaker, Waker};
use futures::{executor, Future, Poll};
use log::debug;

Expand Down Expand Up @@ -107,8 +107,8 @@ impl Drop for Background {
impl Future for Shutdown {
type Output = Result<(), ()>;

fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
self.inner.shared.shutdown_task.register(lw);
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
self.inner.shared.shutdown_task.register(waker);

if !self.inner.is_shutdown() {
return Poll::Pending;
Expand Down
10 changes: 5 additions & 5 deletions src/reactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use std::{fmt, usize};

use futures::task::{AtomicWaker, LocalWaker};
use futures::task::{AtomicWaker, Waker};
use log::{debug, log_enabled, trace, Level};
use mio::event::Evented;
use slab::Slab;
Expand Down Expand Up @@ -507,20 +507,20 @@ impl Inner {
}

/// Registers interest in the I/O resource associated with `token`.
fn register(&self, lw: &LocalWaker, token: usize, dir: Direction) {
fn register(&self, waker: &Waker, token: usize, dir: Direction) {
debug!("scheduling direction for: {}", token);
let io_dispatch = self.io_dispatch.read();
let sched = io_dispatch.get(token).unwrap();

let (waker, ready) = match dir {
let (atomic_waker, ready) = match dir {
Direction::Read => (&sched.reader, !mio::Ready::writable()),
Direction::Write => (&sched.writer, mio::Ready::writable()),
};

waker.register(lw);
atomic_waker.register(waker);

if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
waker.wake();
atomic_waker.wake();
}
}
}
Expand Down
62 changes: 31 additions & 31 deletions src/reactor/poll_evented.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::Registration;

use futures::io::{AsyncRead, AsyncWrite};
use futures::task::LocalWaker;
use futures::task::Waker;
use futures::{ready, Poll};
use mio;
use mio::event::Evented;
Expand Down Expand Up @@ -160,7 +160,7 @@ where
/// cleared by calling [`clear_read_ready`].
///
/// [`clear_read_ready`]: #method.clear_read_ready
pub fn poll_read_ready(&self, lw: &LocalWaker) -> Poll<io::Result<mio::Ready>> {
pub fn poll_read_ready(&self, waker: &Waker) -> Poll<io::Result<mio::Ready>> {
self.register()?;

// Load cached & encoded readiness.
Expand All @@ -175,7 +175,7 @@ where
// stream. This happens in a loop to ensure that the stream gets
// drained.
loop {
let ready = ready!(self.inner.registration.poll_read_ready(lw)?);
let ready = ready!(self.inner.registration.poll_read_ready(waker)?);
cached |= ready.as_usize();

// Update the cache store
Expand Down Expand Up @@ -207,14 +207,14 @@ where
///
/// The `mask` argument specifies the readiness bits to clear. This may not
/// include `writable` or `hup`.
pub fn clear_read_ready(&self, lw: &LocalWaker) -> io::Result<()> {
pub fn clear_read_ready(&self, waker: &Waker) -> io::Result<()> {
self.inner
.read_readiness
.fetch_and(!mio::Ready::readable().as_usize(), Relaxed);

if self.poll_read_ready(lw)?.is_ready() {
if self.poll_read_ready(waker)?.is_ready() {
// Notify the current task
lw.wake();
waker.wake();
}

Ok(())
Expand All @@ -239,7 +239,7 @@ where
///
/// * `ready` contains bits besides `writable` and `hup`.
/// * called from outside of a task context.
pub fn poll_write_ready(&self, lw: &LocalWaker) -> Poll<Result<mio::Ready, io::Error>> {
pub fn poll_write_ready(&self, waker: &Waker) -> Poll<Result<mio::Ready, io::Error>> {
self.register()?;

// Load cached & encoded readiness.
Expand All @@ -254,7 +254,7 @@ where
// stream. This happens in a loop to ensure that the stream gets
// drained.
loop {
let ready = ready!(self.inner.registration.poll_write_ready(lw)?);
let ready = ready!(self.inner.registration.poll_write_ready(waker)?);
cached |= ready.as_usize();

// Update the cache store
Expand Down Expand Up @@ -290,14 +290,14 @@ where
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn clear_write_ready(&self, lw: &LocalWaker) -> io::Result<()> {
pub fn clear_write_ready(&self, waker: &Waker) -> io::Result<()> {
self.inner
.write_readiness
.fetch_and(!mio::Ready::writable().as_usize(), Relaxed);

if self.poll_write_ready(lw)?.is_ready() {
if self.poll_write_ready(waker)?.is_ready() {
// Notify the current task
lw.wake();
waker.wake();
}

Ok(())
Expand All @@ -318,13 +318,13 @@ impl<E> AsyncRead for PollEvented<E>
where
E: Evented + Read,
{
fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
ready!(self.poll_read_ready(lw)?);
fn poll_read(&mut self, waker: &Waker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
ready!(self.poll_read_ready(waker)?);

let r = self.get_mut().read(buf);

if is_wouldblock(&r) {
self.clear_read_ready(lw)?;
self.clear_read_ready(waker)?;
Poll::Pending
} else {
Poll::Ready(r)
Expand All @@ -336,33 +336,33 @@ impl<E> AsyncWrite for PollEvented<E>
where
E: Evented + Write,
{
fn poll_write(&mut self, lw: &LocalWaker, buf: &[u8]) -> Poll<io::Result<usize>> {
ready!(self.poll_write_ready(lw)?);
fn poll_write(&mut self, waker: &Waker, buf: &[u8]) -> Poll<io::Result<usize>> {
ready!(self.poll_write_ready(waker)?);

let r = self.get_mut().write(buf);

if is_wouldblock(&r) {
self.clear_write_ready(lw)?;
self.clear_write_ready(waker)?;
Poll::Pending
} else {
Poll::Ready(r)
}
}

fn poll_flush(&mut self, lw: &LocalWaker) -> Poll<io::Result<()>> {
ready!(self.poll_write_ready(lw)?);
fn poll_flush(&mut self, waker: &Waker) -> Poll<io::Result<()>> {
ready!(self.poll_write_ready(waker)?);

let r = self.get_mut().flush();

if is_wouldblock(&r) {
self.clear_write_ready(lw)?;
self.clear_write_ready(waker)?;
Poll::Pending
} else {
Poll::Ready(r)
}
}

fn poll_close(&mut self, _: &LocalWaker) -> Poll<io::Result<()>> {
fn poll_close(&mut self, _: &Waker) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
Expand All @@ -374,13 +374,13 @@ where
E: Evented,
&'a E: Read,
{
fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
ready!(self.poll_read_ready(lw)?);
fn poll_read(&mut self, waker: &Waker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
ready!(self.poll_read_ready(waker)?);

let r = self.get_ref().read(buf);

if is_wouldblock(&r) {
self.clear_read_ready(lw)?;
self.clear_read_ready(waker)?;
Poll::Pending
} else {
Poll::Ready(r)
Expand All @@ -393,33 +393,33 @@ where
E: Evented,
&'a E: Write,
{
fn poll_write(&mut self, lw: &LocalWaker, buf: &[u8]) -> Poll<io::Result<usize>> {
ready!(self.poll_write_ready(lw)?);
fn poll_write(&mut self, waker: &Waker, buf: &[u8]) -> Poll<io::Result<usize>> {
ready!(self.poll_write_ready(waker)?);

let r = self.get_ref().write(buf);

if is_wouldblock(&r) {
self.clear_write_ready(lw)?;
self.clear_write_ready(waker)?;
Poll::Pending
} else {
Poll::Ready(r)
}
}

fn poll_flush(&mut self, lw: &LocalWaker) -> Poll<io::Result<()>> {
ready!(self.poll_write_ready(lw)?);
fn poll_flush(&mut self, waker: &Waker) -> Poll<io::Result<()>> {
ready!(self.poll_write_ready(waker)?);

let r = self.get_ref().flush();

if is_wouldblock(&r) {
self.clear_write_ready(lw)?;
self.clear_write_ready(waker)?;
Poll::Pending
} else {
Poll::Ready(r)
}
}

fn poll_close(&mut self, _: &LocalWaker) -> Poll<io::Result<()>> {
fn poll_close(&mut self, _: &Waker) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
Expand Down
40 changes: 20 additions & 20 deletions src/reactor/registration.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{Direction, HandlePriv};

use futures::task::LocalWaker;
use futures::task::Waker;
use futures::Poll;
use mio::{self, Evented};

Expand Down Expand Up @@ -65,7 +65,7 @@ struct Inner {
#[derive(Debug)]
struct Node {
direction: Direction,
waker: *const LocalWaker,
waker: *const Waker,
next: *mut Node,
}

Expand Down Expand Up @@ -248,8 +248,8 @@ impl Registration {
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn poll_read_ready(&self, lw: &LocalWaker) -> Poll<io::Result<mio::Ready>> {
match self.poll_ready(Some(lw), Direction::Read) {
pub fn poll_read_ready(&self, waker: &Waker) -> Poll<io::Result<mio::Ready>> {
match self.poll_ready(Some(waker), Direction::Read) {
Ok(Some(v)) => Poll::Ready(Ok(v)),
Ok(None) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
Expand Down Expand Up @@ -299,8 +299,8 @@ impl Registration {
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn poll_write_ready(&self, lw: &LocalWaker) -> Poll<io::Result<mio::Ready>> {
match self.poll_ready(Some(lw), Direction::Write) {
pub fn poll_write_ready(&self, waker: &Waker) -> Poll<io::Result<mio::Ready>> {
match self.poll_ready(Some(waker), Direction::Write) {
Ok(Some(v)) => Poll::Ready(Ok(v)),
Ok(None) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
Expand All @@ -320,7 +320,7 @@ impl Registration {

fn poll_ready(
&self,
lw: Option<&LocalWaker>,
waker: Option<&Waker>,
direction: Direction,
) -> io::Result<Option<mio::Ready>> {
let mut state = self.state.load(SeqCst);
Expand All @@ -339,23 +339,23 @@ impl Registration {
}
READY => {
let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
return inner.poll_ready(lw, direction);
return inner.poll_ready(waker, direction);
}
LOCKED => {
if lw.is_none() {
if waker.is_none() {
// Skip the notification tracking junk.
return Ok(None);
}

let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node;

let lw = lw.unwrap();
let waker = waker.unwrap();

// Get the node
let mut n = node.take().unwrap_or_else(|| {
Box::new(Node {
direction,
waker: lw as *const LocalWaker,
waker: waker as *const Waker,
next: ptr::null_mut(),
})
});
Expand Down Expand Up @@ -414,21 +414,21 @@ impl Inner {
(inner, res)
}

fn register(&self, lw: &LocalWaker, direction: Direction) {
fn register(&self, waker: &Waker, direction: Direction) {
if self.token == ERROR {
lw.wake();
waker.wake();
return;
}

let inner = match self.handle.inner() {
Some(inner) => inner,
None => {
lw.wake();
waker.wake();
return;
}
};

inner.register(lw, self.token, direction);
inner.register(waker, self.token, direction);
}

fn deregister<E: Evented>(&self, io: &E) -> io::Result<()> {
Expand All @@ -449,7 +449,7 @@ impl Inner {

fn poll_ready(
&self,
lw: Option<&LocalWaker>,
waker: Option<&Waker>,
direction: Direction,
) -> io::Result<Option<mio::Ready>> {
if self.token == ERROR {
Expand Down Expand Up @@ -481,12 +481,12 @@ impl Inner {
let mut ready =
mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst));

if ready.is_empty() && lw.is_some() {
let lw = lw.unwrap();
if ready.is_empty() && waker.is_some() {
let waker = waker.unwrap();
// Update the task info
match direction {
Direction::Read => sched.reader.register(lw),
Direction::Write => sched.writer.register(lw),
Direction::Read => sched.reader.register(waker),
Direction::Write => sched.writer.register(waker),
}

// Try again
Expand Down
Loading