Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High CPU usage and bad performance when using par_bridge on a bounded channel #730

Closed
LeoHexspoor opened this issue Mar 6, 2020 · 5 comments

Comments

@LeoHexspoor
Copy link

Hi,

I'm using rayon 1.3 and rust 1.41.1 on Windows 10.

I've created a bounded channel set to 0 to which items are pushed from a separate thread using thread::spawn. In this thread I am doing file IO and I parse data, some items are parsed quicker than others so data is pushed at random intervals.

I have the following code to read from the channel:

rayon::ThreadPoolBuilder::new().num_threads(THREAD_COUNT).build_global().unwrap();

let (tx, rx) = mpsc::sync_channel::<TempObj>(0);

//--- snip ---

rx
    .into_iter()
    .par_bridge()
    .map(|t| process_temp_obj(t))
    .for_each(move |r| println!("{}", r));

//--- snip ---

fn process_temp_obj(obj: TempObj) -> String {
    obj.key
}

I have an 8 core CPU in the test system. If I set the THREAD_COUNT to 4, I get around 57% CPU usage, while I would expect most threads to be idling since the parsing takes more time than it takes to print out a message.

When I remove the call to par_bridge() the entire process runs about 50% faster and it consumes a single full CPU core. The actual wall-clock time is 135 seconds without par_bridge() vs 185 seconds with par_bridge().

Somehow with par_bridge() added I have cores maxing out doing seemingly nothing useful and slowing down the entire process. I could understand the process being a bit slower due to overhead of parallelism while I am not doing anything meaningful (yet) in the threads. But I can't explain the CPU usage. Any idea what may be going on here?

@LeoHexspoor
Copy link
Author

If it helps I have made the example below as a minimal reproducible case of what I am trying to do.

You can toggle the call to par_bridge() and see that with it the CPU usage skyrockets, without it there is almost no CPU usage.

use std::sync::mpsc::{self};
use std::thread;
use std::time::{Duration};

use rayon::iter::ParallelBridge;
use rayon::prelude::ParallelIterator;

fn main() {
    
    let thread_count: usize = 4;
    rayon::ThreadPoolBuilder::new().num_threads(thread_count).build_global().unwrap();
    
    let (gen_tx, gen_rx) = mpsc::sync_channel::<i32>(0);
    thread::spawn(move || {
        let sleep_duration = Duration::from_millis(1000);
        for i in 1..11 {
            thread::sleep(sleep_duration);
            gen_tx.send(i).unwrap();
        }
    });

    gen_rx
        .into_iter()
        .par_bridge()
        .for_each(|input| println!("Received: {}", input));
}

@cuviper
Copy link
Member

cuviper commented Mar 9, 2020

I think this is probably an instance of #642, which may be improved when we finally land @nikomatsakis's changes to the sleep/wake algorithm.

It's still going to be suboptimal with par_bridge just due to the way that tries to batch the iterator. We have to take a mutex lock on the serial iterator input, so it tries to read a lot of items at once to amortize that somewhat. In your case though, you'll basically be blocking one of rayon's threads as it waits for more items on the iterator.

If you're willing to refactor away from iterator-style processing, you might have better luck ditching the channel and directly using rayon::spawn for each of your slowly generated jobs. You'll still probably want the sleep/wake changes first mentioned, but that should avoid the par_bridge caveat.

@LeoHexspoor
Copy link
Author

I'm definitely willing to refactor. But my main goals with the bounded channel is that I want my producer to stop producing new items until I have a thread free to do the processing. Do you have any pointers on how you would go about doing such a thing using rayon::spawn?

What I understand now is that as long as the processing jobs are slower than my producer I should not see any real-world negative impact by this. So probably this only manifested because I haven't implemented the logic in my processing steps yet.

Thanks for your help!

@cuviper
Copy link
Member

cuviper commented Mar 9, 2020

Hmm, another option with a bounded channel is to spawn N jobs that all do the same rx.into_iter().for_each(...). You'd need to use crossbeam-channel to have multiple consumers, but it's broadly considered superior to std::sync::mpsc anyway. The downside then is that it would be blocking all N rayon threads, so they wouldn't have any chance for other work-stealing. At that point, a simpler thread pool or even just N direct threads might be preferable.

If you do still want rayon work stealing, maybe you could flip that rate-limiting to the result side? Have each job send a result or token back on a channel when they're done. This does not need to be a bounded / blocking send. Your producer can start by spawning N jobs, then read a result before spawning each subsequent job.

@LeoHexspoor
Copy link
Author

I'll take a closer look at crossbeam-channel. I don't mind losing the work stealing and having N direct threads may make everything much simpler. Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants