Skip to content

Commit

Permalink
Reduce unsafe usage (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
trevyn authored Jul 7, 2022
1 parent 81bfb05 commit d48ec2c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 27 deletions.
1 change: 1 addition & 0 deletions async-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ repository = "/~https://github.com/tokio-rs/async-stream"
[dependencies]
async-stream-impl = { version = "=0.3.3", path = "../async-stream-impl" }
futures-core = "0.3"
pin-project-lite = "0.2"

[dev-dependencies]
futures-util = "0.3"
Expand Down
52 changes: 27 additions & 25 deletions async-stream/src/async_stream.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use crate::yielder::Receiver;

use futures_core::{FusedStream, Stream};
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

#[doc(hidden)]
#[derive(Debug)]
pub struct AsyncStream<T, U> {
rx: Receiver<T>,
done: bool,
generator: U,
pin_project! {
#[doc(hidden)]
#[derive(Debug)]
pub struct AsyncStream<T, U> {
rx: Receiver<T>,
done: bool,
#[pin]
generator: U,
}
}

impl<T, U> AsyncStream<T, U> {
Expand Down Expand Up @@ -40,30 +44,28 @@ where
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe {
let me = Pin::get_unchecked_mut(self);
let me = self.project();

if me.done {
return Poll::Ready(None);
}
if *me.done {
return Poll::Ready(None);
}

let mut dst = None;
let res = {
let _enter = me.rx.enter(&mut dst);
Pin::new_unchecked(&mut me.generator).poll(cx)
};
let mut dst = None;
let res = {
let _enter = me.rx.enter(&mut dst);
me.generator.poll(cx)
};

me.done = res.is_ready();
*me.done = res.is_ready();

if dst.is_some() {
return Poll::Ready(dst.take());
}
if dst.is_some() {
return Poll::Ready(dst.take());
}

if me.done {
Poll::Ready(None)
} else {
Poll::Pending
}
if *me.done {
Poll::Ready(None)
} else {
Poll::Pending
}
}

Expand Down
4 changes: 2 additions & 2 deletions async-stream/src/yielder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ impl<T> Future for Send<T> {
return Poll::Ready(());
}

STORE.with(|cell| unsafe {
STORE.with(|cell| {
let ptr = cell.get() as *mut Option<T>;
let option_ref = ptr.as_mut().expect("invalid usage");
let option_ref = unsafe { ptr.as_mut() }.expect("invalid usage");

if option_ref.is_none() {
*option_ref = self.value.take();
Expand Down

0 comments on commit d48ec2c

Please sign in to comment.