Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed AsyncSeek requirement from page stream #149

Merged
merged 3 commits into from
Jun 15, 2022

Conversation

medwards
Copy link
Contributor

When streaming pages over the network it is quite awkward to fulfill the AsyncSeek constraint.

This PR exposes methods that work nicely with usages that only provide AsyncRead (for example, a Rusoto S3 StreamingBody /w TryStreamExt::into_async_read).

cargo test passes, but I note there are no tests that exercise the page filtering. A naive attempt to add additional pages in test_column_async didn't work. If the lack of tests here are of concern then I need some guidance on where/how to add them.

This method assumes that the reader is already at the start of the
column. The underlying private method now skips filtered pages by
copying their bytes into a sink (hopefully performance-equivalent to
seeking).
@jorgecarleitao jorgecarleitao added the enhancement New feature or request label Jun 13, 2022
Copy link
Owner

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the PR and for the idea.

I agree that the API is not ideal, but I am not sure the root cause is the AsyncSeek requirement here. If we use .take, we will still incur the IO roundtrip, only to discard the result afterwards, right?

Now, most remote blob storage support a range-bytes query parameter, which is equivalent to a AsyncSeek + AsyncRead call (seek to the start of the range, read for the range).

Have you considered something like this example? The crate ranged-reader-rs provides a wrapper to convert APIs provided by remote blob storage to AsyncRead + AsyncSeek exactly to address this use-case.

@@ -64,7 +83,7 @@ fn _get_page_stream<R: AsyncRead + AsyncSeek + Unpin + Send>(
if let Some(data_header) = data_header {
if !pages_filter(&descriptor, &data_header) {
// page to be skipped, we sill need to seek
reader.seek(SeekFrom::Current(read_size)).await?;
copy(reader.take(read_size as u64), &mut sink()).await?;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this still perform a full read of the data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, but I think RangedAsyncReader does this as well (copies the bytes into the internal buffer) or worse (needs to make a new HTTP request for the next read).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth noting our use-case doesn't filter ever. If you have a usecase where there is lots of filtering and additional HTTP requests are worthwhile because the amount of skipped data is large then would a get_all_pages_stream that just used a read be acceptable?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, your argument is that it is likely faster to pre-fetch 3 pages even if the middle one is skipped, than performing 2 requests (first and third page).

Under that argument, we should not even require AsyncSeek, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under that argument, we should not even require AsyncSeek, right?

Yes but when I made that argument I was thinking of skipping one or two pages once in awhile. Also worth considering is say 1000 pages where all of the intermediate pages are filtered (leaving on the first and the last). I could see this benefiting from a second HTTP request rather than sinkholing the intermediate bytes.

@medwards
Copy link
Contributor Author

medwards commented Jun 14, 2022

Our existing work already uses a AsyncSeek providing wrapper (ConDow's RandomAccessReader). In addition to removing the extra code required to support AsyncSeek we have also observed performance improvements when we pass a reader that has already made the HTTP request before being passed to get_page_stream. Wrappers like RangedAsyncReader defer this HTTP request until the first poll_read. We'll play around with RangedAsyncReader but this will be the third deferred HTTP request reader that we will have experimented with so I do not expect much.

@medwards
Copy link
Contributor Author

Our benchmarks were actually timing out when we tried to use RangedAsyncReader. Local testing reproduced this (loading and printing a single string column of 10M rows finished in 1s /w a simple Rusoto body stream + the new parquet2 functions but using the current parquet2 functions /w RangedAsyncReader took 100s. To be honest this was unexpected, our other implementations do not take more than 5s for this benchmark and I didn't expect an order of magnitude difference in performance, let alone two orders.

Here is our range_get method. Looking through rust-s3 I'm pretty sure this does the same thing and I tried two different ways to load the data into the vec:

// `downloader` is a wrapper around a Rusoto S3 client, `fetch_part` is a convenience method that builds the `GetObjectRequest` for you
        let range_get = Box::new(move |start: u64, length: usize| {
            let bucket = bucket.clone();
            let path = path.clone();
            let downloader = downloader.clone();
            Box::pin(async move {
                let bucket = bucket.clone();
                let path = path.clone();
                let downloader = downloader.clone();
                let response = downloader.fetch_part(bucket, path, start, start + length as u64 - 1).await.unwrap();
                //let mut data = vec![0; length];
                let data = response.body.unwrap().map_ok(|b| b.to_vec()).try_concat().await?;
                //let _ = response.body.unwrap().into_async_read().compat().read_exact(data.as_mut()).await.unwrap();
                Ok(RangeOutput { start, data })
            }) as BoxFuture<'static, std::io::Result<RangeOutput>>
        });
        async move {
            let reader = RangedAsyncReader::new(self.blob_length as usize, 4 * 1024, range_get);
            Ok(reader)
        }
        .boxed()

@jorgecarleitao
Copy link
Owner

For reference, this is a topic of significant interest to me. I would be very happy to work this out together.

Thanks a lot for taking the time and benching this. For my understanding, I can read 4 * 1024 (i.e. 4Kb) in the code you posted. That seems a bit on the low side of things (I would expect at least 4 Mb). Did you use that same size in the other example / baseline?

Another question: if you are not skipping pages, are you reading page by page to not read the whole column chunk into memory? I.e. an alternative here is to not even stream the pages and simply read the whole column chunk. Again, I agree that needs to be fixed, I am just trying to get an idea of the use-case / constraints that you are under to best support this.

Copy link
Owner

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I finally understood this PR - sorry for the slow reasoning here. I agree that the core change is beneficial and we should go for it.

Specifically, drop the Seek requirement and use copy and sink as this PR does.

I left three comments related to other changes, just to tidy up a bit.

src/read/stream.rs Outdated Show resolved Hide resolved
src/read/stream.rs Outdated Show resolved Hide resolved
src/read/stream.rs Outdated Show resolved Hide resolved
medwards added 2 commits June 15, 2022 17:58
`deserialize_metadata` is helpful for readers that do not implement
`AsyncSeek`. Instead, they can independently read in the metadata bytes
(generally without seek this means two reads, one to get the size of the
metadata and then another to load it) and then provide that to
`deserialize_metadata`.
@medwards medwards force-pushed the streaming-wo-seek branch from f58246b to 31a4d03 Compare June 15, 2022 16:01
@medwards
Copy link
Contributor Author

I would be very happy to work this out together.

Yup, I'm just sharing results here and trying to move the conversation forward. If we discover that the PR is worth closing because we discover some deficiencies in our benchmarks then that's fine. For example...

[The minimum read length] seems a bit on the low side of things (I would expect at least 4 Mb).

I was wondering about this argument and thought that poll_read would largely drive this behaviour. Simple local testing shows that 4000 * 1024 is 10x faster (still much slower than our impls), but 40000 * 1024 is significantly slower so I guess I need to go back and re-read the reader code. Hopefully we can get a reliable benchmark to validate this.

are you reading page by page to not read the whole column chunk into memory?

We're more interested in doing additional page processing in parallel, so we load different chunks at the same time, and while waiting for new pages to stream over the network we can assign tasks to work on already downloaded pages. Reducing memory footprint is an additional (valuable) feature.

@jorgecarleitao
Copy link
Owner

I was wondering about this argument and thought that poll_read would largely drive this behaviour. Simple local testing shows that 4000 * 1024 is 10x faster (still much slower than our impls), but 40000 * 1024 is significantly slower so I guess I need to go back and re-read the reader code. Hopefully we can get a reliable benchmark to validate this.

Got it. If you have faster code, then use it :). If you could provide an example, it could be worthwhile adding it to the examples section on reading from s3.

We're more interested in doing additional page processing in parallel, so we load different chunks at the same time, and while waiting for new pages to stream over the network we can assign tasks to work on already downloaded pages. Reducing memory footprint is an additional (valuable) feature.

Got it. My thoughts so far on this problem is that the optional (CPU-wise) when multiple columns of a record need to be available is to join the stream of every column chunk on the row group using read_columns_many_async (IO-bounded) and process every column in parallel using rayon (CPU-bounded). A pseudo code I have around for this:

async fn _read_colums(
    path: &Path,
    row_group: &RowGroupMetaData,
    fields: &[Field],
    projection: &Option<Vec<usize>>,
    chunk_size: Option<usize>,
) -> Result<Vec<ArrayIter<'static>>, Error> {
    let mut file = BufReader::new(File::open(path).await?);

    // IO-bounded
    trace!("[parquet/read][start]");
    let fields = if let Some(projection) = projection {
        projection.iter().map(|x| fields[*x].clone()).collect()
    } else {
        fields.to_vec()
    };

    // read (IO-bounded) all columns into memory (using a subset of the fields to project)
    let columns = read::read_columns_many_async(&mut file, row_group, fields, chunk_size).await?;
    trace!("[parquet/read][end]");
    Ok(columns)
}

/// # Panic
/// If the iterators are empty
fn deserialize_parallel(
    columns: &mut [Arc<Mutex<ArrayIter<'static>>>],
    names: Vec<String>,
) -> Result<Batch, Error> {
    trace!("[parquet/deserialize][start]");
    // CPU-bounded
    let columns = columns
        .par_iter_mut()
        .zip(names.par_iter())
        .map(|(iter, name)| {
            trace!("[parquet/deserialize/column/{}][start]", name);
            let array = iter.lock().unwrap().next().transpose()?.unwrap();
            trace!("[parquet/deserialize/column/{}][end]", name);
            Ok(array)
        })
        .collect::<Result<Vec<_>, Error>>()?;

    let batch = Chunk {
        arrays: columns.into_iter().collect(),
    };
    trace!("[parquet/deserialize][end][{}]", batch.len());

    Ok(batch)
}

deserialize_parallel is CPU-bounded and runs under tokio-rayon (so, Rayon's thread-pool). _read_colums is IO-bounded and runs on tokio's thread pool. This basically pushes CPU-bounded tasks to a dedicated thread pool to not block the runtime's thread.

But this is only useful when we need all columns from a record in memory, and requires all (compressed) columns in memory, which may not be feasible in very wide tables. If you can process columns independently, then I agree with you that having an async thread pool fetching the pages and another to process then is optimal. 👍

@jorgecarleitao jorgecarleitao changed the title Provide streaming options /wo AsyncSeek Removed AsyncSeek from page stream Jun 15, 2022
@jorgecarleitao jorgecarleitao changed the title Removed AsyncSeek from page stream Removed AsyncSeek requirement from page stream Jun 15, 2022
@jorgecarleitao jorgecarleitao merged commit 34aac65 into jorgecarleitao:main Jun 15, 2022
@jorgecarleitao
Copy link
Owner

Thanks a lot for this PR and for the patience and explanation 🙇

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants