Skip to content

Commit

Permalink
Merge #563
Browse files Browse the repository at this point in the history
563: [WIP] Add try_fold, try_for_each, try_reduce r=nikomatsakis a=cuviper

There are six variations here, matching the existing non-try suite:

- `try_fold` and `try_fold_with`
- `try_for_each` and `try_for_each_with`
- `try_reduce` and `try_reduce_with`

All of them operate on `Try::Ok` values similar to the exiting non-try
methods, and short-circuit early to return any `Try::Error` value seen.
This `Try` is a pub-in-private clone of the unstable `std::ops::Try`,
implemented for `Option<T>` and `Result<T, E>`.

TODO and open questions:

- [ ] Needs documentation, examples, and tests.
- [x] Should we wait for `Iterator::try_fold` and `try_for_each` to
      reach rust stable?  They were stabilized in rust-lang/rust#49607,
      but there's always a chance this could get backed out.
    - **Resolved**: they're stable in 1.27
- [x] Should we wait for stable `std::ops::Try`?  We could just keep
      ours private for now, and change to publicly use the standard
      trait later (on sufficiently new rustc).
    - **Resolved**: keep our pub-in-private `Try` for now.
- [x] Should `try_fold` and `try_fold_with` continue to short-circuit
      globally, or change to only a local check?
  - When I first implemented `try_reduce_with`, I use a `try_fold` +
    `try_reduce` combination, like `reduce_with`'s implementation, but
    I didn't like the idea of having double `full: AtomicBool` flags
    in use.
  - If `try_fold` only errors locally, then other threads can continue
    folding normally, and you can decide what to do with the error
    when you further reduce/collect/etc.  e.g. A following `try_reduce`
    will still short-circuit globally.
  - **Resolved**: changed to just a local check.

Closes #495.

Co-authored-by: Josh Stone <cuviper@gmail.com>
  • Loading branch information
bors[bot] and cuviper committed Jun 27, 2018
2 parents da16c2c + 800c736 commit 8ba64ab
Show file tree
Hide file tree
Showing 8 changed files with 773 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/compile_fail/must_use.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ must_use! {
flatten /** v.par_iter().flatten(); */
fold /** v.par_iter().fold(|| 0, |x, _| x); */
fold_with /** v.par_iter().fold_with(0, |x, _| x); */
try_fold /** v.par_iter().try_fold(|| 0, |x, _| Some(x)); */
try_fold_with /** v.par_iter().try_fold_with(0, |x, _| Some(x)); */
inspect /** v.par_iter().inspect(|_| {}); */
interleave /** v.par_iter().interleave(&v); */
interleave_shortest /** v.par_iter().interleave_shortest(&v); */
Expand Down
2 changes: 1 addition & 1 deletion src/iter/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<'r, C, ID, F, T> Folder<T> for FoldFolder<'r, C, ID, F>

pub fn fold_with<U, I, F>(base: I, item: U, fold_op: F) -> FoldWith<I, U, F>
where I: ParallelIterator,
F: Fn(U, I::Item) -> U + Sync,
F: Fn(U, I::Item) -> U + Sync + Send,
U: Send + Clone
{
FoldWith {
Expand Down
253 changes: 253 additions & 0 deletions src/iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,19 @@
//! [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
//! [split]: fn.split.html
//! [plumbing]: plumbing
//!
//! Note: Several of the `ParallelIterator` methods rely on a `Try` trait which
//! has been deliberately obscured from the public API. This trait is intended
//! to mirror the unstable `std::ops::Try` with implementations for `Option` and
//! `Result`, where `Some`/`Ok` values will let those iterators continue, but
//! `None`/`Err` values will exit early.
pub use either::Either;
use std::cmp::{self, Ordering};
use std::iter::{Sum, Product};
use std::ops::Fn;
use self::plumbing::*;
use self::private::Try;

// There is a method to the madness here:
//
Expand Down Expand Up @@ -109,7 +116,11 @@ pub mod plumbing;
mod for_each;
mod fold;
pub use self::fold::{Fold, FoldWith};
mod try_fold;
pub use self::try_fold::{TryFold, TryFoldWith};
mod reduce;
mod try_reduce;
mod try_reduce_with;
mod skip;
pub use self::skip::Skip;
mod splitter;
Expand Down Expand Up @@ -364,6 +375,69 @@ pub trait ParallelIterator: Sized + Send {
self.map_with(init, op).for_each(|()| ())
}

/// Executes a fallible `OP` on each item produced by the iterator, in parallel.
///
/// If the `OP` returns `Result::Err` or `Option::None`, we will attempt to
/// stop processing the rest of the items in the iterator as soon as
/// possible, and we will return that terminating value. Otherwise, we will
/// return an empty `Result::Ok(())` or `Option::Some(())`. If there are
/// multiple errors in parallel, it is not specified which will be returned.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
/// use std::io::{self, Write};
///
/// // This will stop iteration early if there's any write error, like
/// // having piped output get closed on the other end.
/// (0..100).into_par_iter()
/// .try_for_each(|x| writeln!(io::stdout(), "{:?}", x))
/// .expect("expected no write errors");
/// ```
fn try_for_each<OP, R>(self, op: OP) -> R
where OP: Fn(Self::Item) -> R + Sync + Send,
R: Try<Ok = ()> + Send
{
self.map(op).try_reduce(|| (), |(), ()| R::from_ok(()))
}

/// Executes a fallible `OP` on the given `init` value with each item
/// produced by the iterator, in parallel.
///
/// This combines the `init` semantics of [`for_each_with()`] and the
/// failure semantics of [`try_for_each()`].
///
/// [`for_each_with()`]: #method.for_each_with
/// [`try_for_each()`]: #method.try_for_each
///
/// # Examples
///
/// ```
/// use std::sync::mpsc::channel;
/// use rayon::prelude::*;
///
/// let (sender, receiver) = channel();
///
/// (0..5).into_par_iter()
/// .try_for_each_with(sender, |s, x| s.send(x))
/// .expect("expected no send errors");
///
/// let mut res: Vec<_> = receiver.iter().collect();
///
/// res.sort();
///
/// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
/// ```
fn try_for_each_with<OP, T, R>(self, init: T, op: OP) -> R
where OP: Fn(&mut T, Self::Item) -> R + Sync + Send,
T: Send + Clone,
R: Try<Ok = ()> + Send
{
self.map_with(init, op)
.try_reduce(|| (), |(), ()| R::from_ok(()))
}

/// Counts the number of items in this parallel iterator.
///
/// # Examples
Expand Down Expand Up @@ -679,6 +753,87 @@ pub trait ParallelIterator: Sized + Send {
})
}

/// Reduces the items in the iterator into one item using a fallible `op`.
/// The `identity` argument is used the same way as in [`reduce()`].
///
/// [`reduce()`]: #method.reduce
///
/// If a `Result::Err` or `Option::None` item is found, or if `op` reduces
/// to one, we will attempt to stop processing the rest of the items in the
/// iterator as soon as possible, and we will return that terminating value.
/// Otherwise, we will return the final reduced `Result::Ok(T)` or
/// `Option::Some(T)`. If there are multiple errors in parallel, it is not
/// specified which will be returned.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// // Compute the sum of squares, being careful about overflow.
/// fn sum_squares<I: IntoParallelIterator<Item = i32>>(iter: I) -> Option<i32> {
/// iter.into_par_iter()
/// .map(|i| i.checked_mul(i)) // square each item,
/// .try_reduce(|| 0, i32::checked_add) // and add them up!
/// }
/// assert_eq!(sum_squares(0..5), Some(0 + 1 + 4 + 9 + 16));
///
/// // The sum might overflow
/// assert_eq!(sum_squares(0..10_000), None);
///
/// // Or the squares might overflow before it even reaches `try_reduce`
/// assert_eq!(sum_squares(1_000_000..1_000_001), None);
/// ```
fn try_reduce<T, OP, ID>(self, identity: ID, op: OP) -> Self::Item
where OP: Fn(T, T) -> Self::Item + Sync + Send,
ID: Fn() -> T + Sync + Send,
Self::Item: Try<Ok = T>
{
try_reduce::try_reduce(self, identity, op)
}

/// Reduces the items in the iterator into one item using a fallible `op`.
///
/// Like [`reduce_with()`], if the iterator is empty, `None` is returned;
/// otherwise, `Some` is returned. Beyond that, it behaves like
/// [`try_reduce()`] for handling `Err`/`None`.
///
/// [`reduce_with()`]: #method.reduce_with
/// [`try_reduce()`]: #method.try_reduce
///
/// For instance, with `Option` items, the return value may be:
/// - `None`, the iterator was empty
/// - `Some(None)`, we stopped after encountering `None`.
/// - `Some(Some(x))`, the entire iterator reduced to `x`.
///
/// With `Result` items, the nesting is more obvious:
/// - `None`, the iterator was empty
/// - `Some(Err(e))`, we stopped after encountering an error `e`.
/// - `Some(Ok(x))`, the entire iterator reduced to `x`.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// let files = ["/dev/null", "/does/not/exist"];
///
/// // Find the biggest file
/// files.into_par_iter()
/// .map(|path| std::fs::metadata(path).map(|m| (path, m.len())))
/// .try_reduce_with(|a, b| {
/// Ok(if a.1 >= b.1 { a } else { b })
/// })
/// .expect("Some value, since the iterator is not empty")
/// .expect_err("not found");
/// ```
fn try_reduce_with<T, OP>(self, op: OP) -> Option<Self::Item>
where OP: Fn(T, T) -> Self::Item + Sync + Send,
Self::Item: Try<Ok = T>,
{
try_reduce_with::try_reduce_with(self, op)
}

/// Parallel fold is similar to sequential fold except that the
/// sequence of items may be subdivided before it is
/// folded. Consider a list of numbers like `22 3 77 89 46`. If
Expand Down Expand Up @@ -846,6 +1001,65 @@ pub trait ParallelIterator: Sized + Send {
fold::fold_with(self, init, fold_op)
}

/// Perform a fallible parallel fold.
///
/// This is a variation of [`fold()`] for operations which can fail with
/// `Option::None` or `Result::Err`. The first such failure stops
/// processing the local set of items, without affecting other folds in the
/// iterator's subdivisions.
///
/// Often, `try_fold()` will be followed by [`try_reduce()`]
/// for a final reduction and global short-circuiting effect.
///
/// [`fold()`]: #method.fold
/// [`try_reduce()`]: #method.try_reduce
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// let bytes = 0..22_u8;
/// let sum = bytes.into_par_iter()
/// .try_fold(|| 0_u32, |a: u32, b: u8| a.checked_add(b as u32))
/// .try_reduce(|| 0, u32::checked_add);
///
/// assert_eq!(sum, Some((0..22).sum())); // compare to sequential
/// ```
fn try_fold<T, R, ID, F>(self, identity: ID, fold_op: F) -> TryFold<Self, R, ID, F>
where F: Fn(T, Self::Item) -> R + Sync + Send,
ID: Fn() -> T + Sync + Send,
R: Try<Ok = T> + Send
{
try_fold::try_fold(self, identity, fold_op)
}

/// Perform a fallible parallel fold with a cloneable `init` value.
///
/// This combines the `init` semantics of [`fold_with()`] and the failure
/// semantics of [`try_fold()`].
///
/// [`fold_with()`]: #method.fold_with
/// [`try_fold()`]: #method.try_fold
///
/// ```
/// use rayon::prelude::*;
///
/// let bytes = 0..22_u8;
/// let sum = bytes.into_par_iter()
/// .try_fold_with(0_u32, |a: u32, b: u8| a.checked_add(b as u32))
/// .try_reduce(|| 0, u32::checked_add);
///
/// assert_eq!(sum, Some((0..22).sum())); // compare to sequential
/// ```
fn try_fold_with<F, T, R>(self, init: T, fold_op: F) -> TryFoldWith<Self, R, F>
where F: Fn(T, Self::Item) -> R + Sync + Send,
R: Try<Ok = T> + Send,
T: Clone + Send
{
try_fold::try_fold_with(self, init, fold_op)
}

/// Sums up the items in the iterator.
///
/// Note that the order in items will be reduced is not specified,
Expand Down Expand Up @@ -2114,3 +2328,42 @@ pub trait ParallelExtend<T>
/// ```
fn par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>;
}

/// We hide the `Try` trait in a private module, as it's only meant to be a
/// stable clone of the standard library's `Try` trait, as yet unstable.
mod private {
/// Clone of `std::ops::Try`.
///
/// Implementing this trait is not permitted outside of `rayon`.
pub trait Try {
private_decl!{}

type Ok;
type Error;
fn into_result(self) -> Result<Self::Ok, Self::Error>;
fn from_ok(v: Self::Ok) -> Self;
fn from_error(v: Self::Error) -> Self;
}

impl<T> Try for Option<T> {
private_impl!{}

type Ok = T;
type Error = ();

fn into_result(self) -> Result<T, ()> { self.ok_or(()) }
fn from_ok(v: T) -> Self { Some(v) }
fn from_error(_: ()) -> Self { None }
}

impl<T, E> Try for Result<T, E> {
private_impl!{}

type Ok = T;
type Error = E;

fn into_result(self) -> Result<T, E> { self }
fn from_ok(v: T) -> Self { Ok(v) }
fn from_error(v: E) -> Self { Err(v) }
}
}
Loading

0 comments on commit 8ba64ab

Please sign in to comment.