-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Removed AsyncSeek
requirement from page stream
#149
Removed AsyncSeek
requirement from page stream
#149
Conversation
This method assumes that the reader is already at the start of the column. The underlying private method now skips filtered pages by copying their bytes into a sink (hopefully performance-equivalent to seeking).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for the PR and for the idea.
I agree that the API is not ideal, but I am not sure the root cause is the AsyncSeek
requirement here. If we use .take
, we will still incur the IO roundtrip, only to discard the result afterwards, right?
Now, most remote blob storage support a range-bytes query parameter, which is equivalent to a AsyncSeek + AsyncRead
call (seek to the start of the range, read for the range).
Have you considered something like this example? The crate ranged-reader-rs
provides a wrapper to convert APIs provided by remote blob storage to AsyncRead + AsyncSeek
exactly to address this use-case.
@@ -64,7 +83,7 @@ fn _get_page_stream<R: AsyncRead + AsyncSeek + Unpin + Send>( | |||
if let Some(data_header) = data_header { | |||
if !pages_filter(&descriptor, &data_header) { | |||
// page to be skipped, we sill need to seek | |||
reader.seek(SeekFrom::Current(read_size)).await?; | |||
copy(reader.take(read_size as u64), &mut sink()).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't this still perform a full read of the data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, but I think RangedAsyncReader
does this as well (copies the bytes into the internal buffer) or worse (needs to make a new HTTP request for the next read).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth noting our use-case doesn't filter ever. If you have a usecase where there is lots of filtering and additional HTTP requests are worthwhile because the amount of skipped data is large then would a get_all_pages_stream
that just used a read be acceptable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, your argument is that it is likely faster to pre-fetch 3 pages even if the middle one is skipped, than performing 2 requests (first and third page).
Under that argument, we should not even require AsyncSeek
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under that argument, we should not even require AsyncSeek, right?
Yes but when I made that argument I was thinking of skipping one or two pages once in awhile. Also worth considering is say 1000 pages where all of the intermediate pages are filtered (leaving on the first and the last). I could see this benefiting from a second HTTP request rather than sinkholing the intermediate bytes.
Our existing work already uses a |
Our benchmarks were actually timing out when we tried to use Here is our
|
For reference, this is a topic of significant interest to me. I would be very happy to work this out together. Thanks a lot for taking the time and benching this. For my understanding, I can read Another question: if you are not skipping pages, are you reading page by page to not read the whole column chunk into memory? I.e. an alternative here is to not even stream the pages and simply read the whole column chunk. Again, I agree that needs to be fixed, I am just trying to get an idea of the use-case / constraints that you are under to best support this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think I finally understood this PR - sorry for the slow reasoning here. I agree that the core change is beneficial and we should go for it.
Specifically, drop the Seek requirement and use copy
and sink
as this PR does.
I left three comments related to other changes, just to tidy up a bit.
`deserialize_metadata` is helpful for readers that do not implement `AsyncSeek`. Instead, they can independently read in the metadata bytes (generally without seek this means two reads, one to get the size of the metadata and then another to load it) and then provide that to `deserialize_metadata`.
f58246b
to
31a4d03
Compare
Yup, I'm just sharing results here and trying to move the conversation forward. If we discover that the PR is worth closing because we discover some deficiencies in our benchmarks then that's fine. For example...
I was wondering about this argument and thought that
We're more interested in doing additional page processing in parallel, so we load different chunks at the same time, and while waiting for new pages to stream over the network we can assign tasks to work on already downloaded pages. Reducing memory footprint is an additional (valuable) feature. |
Got it. If you have faster code, then use it :). If you could provide an example, it could be worthwhile adding it to the examples section on reading from s3.
Got it. My thoughts so far on this problem is that the optional (CPU-wise) when multiple columns of a record need to be available is to join the stream of every column chunk on the row group using async fn _read_colums(
path: &Path,
row_group: &RowGroupMetaData,
fields: &[Field],
projection: &Option<Vec<usize>>,
chunk_size: Option<usize>,
) -> Result<Vec<ArrayIter<'static>>, Error> {
let mut file = BufReader::new(File::open(path).await?);
// IO-bounded
trace!("[parquet/read][start]");
let fields = if let Some(projection) = projection {
projection.iter().map(|x| fields[*x].clone()).collect()
} else {
fields.to_vec()
};
// read (IO-bounded) all columns into memory (using a subset of the fields to project)
let columns = read::read_columns_many_async(&mut file, row_group, fields, chunk_size).await?;
trace!("[parquet/read][end]");
Ok(columns)
}
/// # Panic
/// If the iterators are empty
fn deserialize_parallel(
columns: &mut [Arc<Mutex<ArrayIter<'static>>>],
names: Vec<String>,
) -> Result<Batch, Error> {
trace!("[parquet/deserialize][start]");
// CPU-bounded
let columns = columns
.par_iter_mut()
.zip(names.par_iter())
.map(|(iter, name)| {
trace!("[parquet/deserialize/column/{}][start]", name);
let array = iter.lock().unwrap().next().transpose()?.unwrap();
trace!("[parquet/deserialize/column/{}][end]", name);
Ok(array)
})
.collect::<Result<Vec<_>, Error>>()?;
let batch = Chunk {
arrays: columns.into_iter().collect(),
};
trace!("[parquet/deserialize][end][{}]", batch.len());
Ok(batch)
}
But this is only useful when we need all columns from a record in memory, and requires all (compressed) columns in memory, which may not be feasible in very wide tables. If you can process columns independently, then I agree with you that having an async thread pool fetching the pages and another to process then is optimal. 👍 |
AsyncSeek
AsyncSeek
from page stream
AsyncSeek
from page streamAsyncSeek
requirement from page stream
Thanks a lot for this PR and for the patience and explanation 🙇 |
When streaming pages over the network it is quite awkward to fulfill the
AsyncSeek
constraint.This PR exposes methods that work nicely with usages that only provide
AsyncRead
(for example, a Rusoto S3StreamingBody
/wTryStreamExt::into_async_read
).cargo test
passes, but I note there are no tests that exercise the page filtering. A naive attempt to add additional pages intest_column_async
didn't work. If the lack of tests here are of concern then I need some guidance on where/how to add them.